diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-10-10 10:11:49 -0700 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-08 20:40:38 -0800 |
| commit | d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41 (patch) | |
| tree | bdb9af03a1b73d4edc9ae5e6193a010c9b2b4edc /src/libstd/io | |
| parent | 0c1e1ff1e300868a29405a334e65eae690df971d (diff) | |
| download | rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.tar.gz rust-d34b1b0ca9bf5e0d7cd30952f5de0ab09ed57b41.zip | |
Runtime removal: refactor pipes and networking
This patch continues the runtime removal by moving pipe and networking-related code into `sys`. Because this eliminates APIs in `libnative` and `librustrt`, it is a: [breaking-change] This functionality is likely to be available publicly, in some form, from `std` in the future.
Diffstat (limited to 'src/libstd/io')
| -rw-r--r-- | src/libstd/io/net/addrinfo.rs | 32 | ||||
| -rw-r--r-- | src/libstd/io/net/mod.rs | 45 | ||||
| -rw-r--r-- | src/libstd/io/net/pipe.rs | 65 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 105 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 60 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 58 |
6 files changed, 120 insertions, 245 deletions
diff --git a/src/libstd/io/net/addrinfo.rs b/src/libstd/io/net/addrinfo.rs index 3c72f58b10d..22775d54eff 100644 --- a/src/libstd/io/net/addrinfo.rs +++ b/src/libstd/io/net/addrinfo.rs @@ -20,12 +20,10 @@ getaddrinfo() #![allow(missing_docs)] use iter::Iterator; -use io::{IoResult, IoError}; +use io::{IoResult}; use io::net::ip::{SocketAddr, IpAddr}; use option::{Option, Some, None}; -use result::{Ok, Err}; -use rt::rtio::{IoFactory, LocalIo}; -use rt::rtio; +use sys; use vec::Vec; /// Hints to the types of sockets that are desired when looking up hosts @@ -94,31 +92,7 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> { #[allow(unused_variables)] fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>) -> IoResult<Vec<Info>> { - let hint = hint.map(|Hint { family, socktype, protocol, flags }| { - rtio::AddrinfoHint { - family: family, - socktype: 0, // FIXME: this should use the above variable - protocol: 0, // FIXME: this should use the above variable - flags: flags, - } - }); - match LocalIo::maybe_raise(|io| { - io.get_host_addresses(hostname, servname, hint) - }) { - Ok(v) => Ok(v.into_iter().map(|info| { - Info { - address: SocketAddr { - ip: super::from_rtio(info.address.ip), - port: info.address.port, - }, - family: info.family, - socktype: None, // FIXME: this should use the above variable - protocol: None, // FIXME: this should use the above variable - flags: info.flags, - } - }).collect()), - Err(e) => Err(IoError::from_rtio_error(e)), - } + sys::addrinfo::get_host_addresses(hostname, servname, hint) } // Ignored on android since we cannot give tcp/ip diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index b9b50a55a10..5b1747876d7 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -12,9 +12,8 @@ use io::{IoError, IoResult, InvalidInput}; use option::None; -use result::{Result, Ok, Err}; -use rt::rtio; -use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr, SocketAddr, ToSocketAddr}; +use result::{Ok, Err}; +use self::ip::{SocketAddr, ToSocketAddr}; pub use self::addrinfo::get_host_addresses; @@ -24,46 +23,6 @@ pub mod udp; pub mod ip; pub mod pipe; -fn to_rtio(ip: IpAddr) -> rtio::IpAddr { - match ip { - Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d), - Ipv6Addr(a, b, c, d, e, f, g, h) => { - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) - } - } -} - -fn from_rtio(ip: rtio::IpAddr) -> IpAddr { - match ip { - rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d), - rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { - Ipv6Addr(a, b, c, d, e, f, g, h) - } - } -} - -fn with_addresses_io<A: ToSocketAddr, T>( - addr: A, - action: |&mut rtio::IoFactory, rtio::SocketAddr| -> Result<T, rtio::IoError> -) -> Result<T, IoError> { - const DEFAULT_ERROR: IoError = IoError { - kind: InvalidInput, - desc: "no addresses found for hostname", - detail: None - }; - - let addresses = try!(addr.to_socket_addr_all()); - let mut err = DEFAULT_ERROR; - for addr in addresses.into_iter() { - let addr = rtio::SocketAddr { ip: to_rtio(addr.ip), port: addr.port }; - match rtio::LocalIo::maybe_raise(|io| action(io, addr)) { - Ok(r) => return Ok(r), - Err(e) => err = IoError::from_rtio_error(e) - } - } - Err(err) -} - fn with_addresses<A: ToSocketAddr, T>(addr: A, action: |SocketAddr| -> IoResult<T>) -> IoResult<T> { const DEFAULT_ERROR: IoError = IoError { diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index 8c7deadebea..111b0f2b081 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -26,17 +26,20 @@ instances as clients. use prelude::*; -use io::{Listener, Acceptor, IoResult, IoError, TimedOut, standard_error}; -use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; -use rt::rtio::{RtioUnixAcceptor, RtioPipe}; +use io::{Listener, Acceptor, IoResult, TimedOut, standard_error}; use time::Duration; +use sys::pipe::UnixStream as UnixStreamImp; +use sys::pipe::UnixListener as UnixListenerImp; +use sys::pipe::UnixAcceptor as UnixAcceptorImp; + /// A stream which communicates over a named pipe. pub struct UnixStream { - obj: Box<RtioPipe + Send>, + inner: UnixStreamImp, } impl UnixStream { + /// Connect to a pipe named by `path`. This will attempt to open a /// connection to the underlying socket. /// @@ -53,9 +56,8 @@ impl UnixStream { /// stream.write([1, 2, 3]); /// ``` pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> { - LocalIo::maybe_raise(|io| { - io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p }) - }).map_err(IoError::from_rtio_error) + UnixStreamImp::connect(&path.to_c_str(), None) + .map(|inner| UnixStream { inner: inner }) } /// Connect to a pipe named by `path`, timing out if the specified number of @@ -73,10 +75,8 @@ impl UnixStream { return Err(standard_error(TimedOut)); } - LocalIo::maybe_raise(|io| { - let s = io.unix_connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64)); - s.map(|p| UnixStream { obj: p }) - }).map_err(IoError::from_rtio_error) + UnixStreamImp::connect(&path.to_c_str(), Some(timeout.num_milliseconds() as u64)) + .map(|inner| UnixStream { inner: inner }) } @@ -88,7 +88,7 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_read(&mut self) -> IoResult<()> { - self.obj.close_read().map_err(IoError::from_rtio_error) + self.inner.close_read() } /// Closes the writing half of this connection. @@ -99,7 +99,7 @@ impl UnixStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { - self.obj.close_write().map_err(IoError::from_rtio_error) + self.inner.close_write() } /// Sets the read/write timeout for this socket. @@ -107,7 +107,7 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the read timeout for this socket. @@ -115,7 +115,7 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the write timeout for this socket. @@ -123,36 +123,35 @@ impl UnixStream { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } impl Clone for UnixStream { fn clone(&self) -> UnixStream { - UnixStream { obj: self.obj.clone() } + UnixStream { inner: self.inner.clone() } } } impl Reader for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for UnixStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } /// A value that can listen for incoming named pipe connection requests. pub struct UnixListener { /// The internal, opaque runtime Unix listener. - obj: Box<RtioUnixListener + Send>, + inner: UnixListenerImp, } impl UnixListener { - /// Creates a new listener, ready to receive incoming connections on the /// specified socket. The server will be named by `path`. /// @@ -175,24 +174,22 @@ impl UnixListener { /// # } /// ``` pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> { - LocalIo::maybe_raise(|io| { - io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s }) - }).map_err(IoError::from_rtio_error) + UnixListenerImp::bind(&path.to_c_str()) + .map(|inner| UnixListener { inner: inner }) } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { fn listen(self) -> IoResult<UnixAcceptor> { - self.obj.listen().map(|obj| { - UnixAcceptor { obj: obj } - }).map_err(IoError::from_rtio_error) + self.inner.listen() + .map(|inner| UnixAcceptor { inner: inner }) } } /// A value that can accept named pipe connections, returned from `listen()`. pub struct UnixAcceptor { /// The internal, opaque runtime Unix acceptor. - obj: Box<RtioUnixAcceptor + Send>, + inner: UnixAcceptorImp } impl UnixAcceptor { @@ -210,7 +207,7 @@ impl UnixAcceptor { #[experimental = "the name and arguments to this function are likely \ to change"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Closes the accepting capabilities of this acceptor. @@ -219,15 +216,15 @@ impl UnixAcceptor { /// more information can be found in that documentation. #[experimental] pub fn close_accept(&mut self) -> IoResult<()> { - self.obj.close_accept().map_err(IoError::from_rtio_error) + self.inner.close_accept() } } impl Acceptor<UnixStream> for UnixAcceptor { fn accept(&mut self) -> IoResult<UnixStream> { - self.obj.accept().map(|s| { - UnixStream { obj: s } - }).map_err(IoError::from_rtio_error) + self.inner.accept().map(|s| { + UnixStream { inner: s } + }) } } @@ -246,7 +243,7 @@ impl Clone for UnixAcceptor { /// This function is useful for creating a handle to invoke `close_accept` /// on to wake up any other task blocked in `accept`. fn clone(&self) -> UnixAcceptor { - UnixAcceptor { obj: self.obj.clone() } + UnixAcceptor { inner: self.inner.clone() } } } diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 928c8586739..2545e07cbb5 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -20,19 +20,17 @@ use clone::Clone; use io::IoResult; use iter::Iterator; -use result::{Ok,Err}; +use result::Err; use io::net::ip::{SocketAddr, ToSocketAddr}; -use io::IoError; use io::{Reader, Writer, Listener, Acceptor}; use io::{standard_error, TimedOut}; -use kinds::Send; use option::{None, Some, Option}; -use boxed::Box; -use rt::rtio::{IoFactory, RtioSocket, RtioTcpListener}; -use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; -use rt::rtio; use time::Duration; +use sys::tcp::TcpStream as TcpStreamImp; +use sys::tcp::TcpListener as TcpListenerImp; +use sys::tcp::TcpAcceptor as TcpAcceptorImp; + /// A structure which represents a TCP stream between a local socket and a /// remote socket. /// @@ -50,12 +48,12 @@ use time::Duration; /// drop(stream); // close the connection /// ``` pub struct TcpStream { - obj: Box<RtioTcpStream + Send>, + inner: TcpStreamImp, } impl TcpStream { - fn new(s: Box<RtioTcpStream + Send>) -> TcpStream { - TcpStream { obj: s } + fn new(s: TcpStreamImp) -> TcpStream { + TcpStream { inner: s } } /// Open a TCP connection to a remote host. @@ -64,7 +62,9 @@ impl TcpStream { /// trait can be supplied for the address; see this trait documentation for /// concrete examples. pub fn connect<A: ToSocketAddr>(addr: A) -> IoResult<TcpStream> { - super::with_addresses_io(addr, |io, addr| io.tcp_connect(addr, None).map(TcpStream::new)) + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, None).map(TcpStream::new) + }) } /// Creates a TCP connection to a remote socket address, timing out after @@ -86,39 +86,26 @@ impl TcpStream { return Err(standard_error(TimedOut)); } - super::with_addresses_io(addr, |io, addr| - io.tcp_connect(addr, Some(timeout.num_milliseconds() as u64)).map(TcpStream::new) - ) + super::with_addresses(addr, |addr| { + TcpStreamImp::connect(addr, Some(timeout.num_milliseconds() as u64)) + .map(TcpStream::new) + }) } /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.peer_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.peer_name() } /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.socket_name() } /// Sets the nodelay flag on this connection to the boolean specified #[experimental] pub fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> { - if nodelay { - self.obj.nodelay() - } else { - self.obj.control_congestion() - }.map_err(IoError::from_rtio_error) + self.inner.set_nodelay(nodelay) } /// Sets the keepalive timeout to the timeout specified. @@ -128,10 +115,7 @@ impl TcpStream { /// specified time, in seconds. #[experimental] pub fn set_keepalive(&mut self, delay_in_seconds: Option<uint>) -> IoResult<()> { - match delay_in_seconds { - Some(i) => self.obj.keepalive(i), - None => self.obj.letdie(), - }.map_err(IoError::from_rtio_error) + self.inner.set_keepalive(delay_in_seconds) } /// Closes the reading half of this connection. @@ -165,7 +149,7 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_read(&mut self) -> IoResult<()> { - self.obj.close_read().map_err(IoError::from_rtio_error) + self.inner.close_read() } /// Closes the writing half of this connection. @@ -176,7 +160,7 @@ impl TcpStream { /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. pub fn close_write(&mut self) -> IoResult<()> { - self.obj.close_write().map_err(IoError::from_rtio_error) + self.inner.close_write() } /// Sets a timeout, in milliseconds, for blocking operations on this stream. @@ -198,7 +182,7 @@ impl TcpStream { /// take a look at `set_read_timeout` and `set_write_timeout`. #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the timeout for read operations on this stream. @@ -215,7 +199,7 @@ impl TcpStream { /// during the timeout period. #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the timeout for write operations on this stream. @@ -242,7 +226,7 @@ impl TcpStream { /// asynchronous fashion after the call to write returns. #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } @@ -256,19 +240,19 @@ impl Clone for TcpStream { /// Instead, the first read will receive the first packet received, and the /// second read will receive the second packet. fn clone(&self) -> TcpStream { - TcpStream { obj: self.obj.clone() } + TcpStream { inner: self.inner.clone() } } } impl Reader for TcpStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for TcpStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } @@ -309,7 +293,7 @@ impl Writer for TcpStream { /// # } /// ``` pub struct TcpListener { - obj: Box<RtioTcpListener + Send>, + inner: TcpListenerImp, } impl TcpListener { @@ -324,26 +308,20 @@ impl TcpListener { /// The address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<TcpListener> { - super::with_addresses_io(addr, |io, addr| io.tcp_bind(addr).map(|l| TcpListener { obj: l })) + super::with_addresses(addr, |addr| { + TcpListenerImp::bind(addr).map(|inner| TcpListener { inner: inner }) + }) } /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(rtio::SocketAddr { ip, port }) => { - Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.socket_name() } } impl Listener<TcpStream, TcpAcceptor> for TcpListener { fn listen(self) -> IoResult<TcpAcceptor> { - match self.obj.listen() { - Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }), - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.listen(128).map(|a| TcpAcceptor { inner: a }) } } @@ -351,7 +329,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { /// a `TcpListener`'s `listen` method, and this object can be used to accept new /// `TcpStream` instances. pub struct TcpAcceptor { - obj: Box<RtioTcpAcceptor + Send>, + inner: TcpAcceptorImp, } impl TcpAcceptor { @@ -399,7 +377,7 @@ impl TcpAcceptor { /// ``` #[experimental = "the type of the argument and name of this function are \ subject to change"] - pub fn set_timeout(&mut self, ms: Option<u64>) { self.obj.set_timeout(ms); } + pub fn set_timeout(&mut self, ms: Option<u64>) { self.inner.set_timeout(ms); } /// Closes the accepting capabilities of this acceptor. /// @@ -445,16 +423,13 @@ impl TcpAcceptor { /// ``` #[experimental] pub fn close_accept(&mut self) -> IoResult<()> { - self.obj.close_accept().map_err(IoError::from_rtio_error) + self.inner.close_accept() } } impl Acceptor<TcpStream> for TcpAcceptor { fn accept(&mut self) -> IoResult<TcpStream> { - match self.obj.accept(){ - Ok(s) => Ok(TcpStream::new(s)), - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.accept().map(TcpStream::new) } } @@ -473,7 +448,7 @@ impl Clone for TcpAcceptor { /// This function is useful for creating a handle to invoke `close_accept` /// on to wake up any other task blocked in `accept`. fn clone(&self) -> TcpAcceptor { - TcpAcceptor { obj: self.obj.clone() } + TcpAcceptor { inner: self.inner.clone() } } } @@ -1112,8 +1087,6 @@ mod test { #[test] fn shutdown_smoke() { - use rt::rtio::RtioTcpStream; - let addr = next_test_ip4(); let a = TcpListener::bind(addr).unwrap().listen(); spawn(proc() { @@ -1124,7 +1097,7 @@ mod test { }); let mut s = TcpStream::connect(addr).unwrap(); - assert!(s.obj.close_write().is_ok()); + assert!(s.inner.close_write().is_ok()); assert!(s.write([1]).is_err()); assert_eq!(s.read_to_end(), Ok(vec!(1))); } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 4ae054beadb..31b61989647 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -17,13 +17,10 @@ use clone::Clone; use io::net::ip::{SocketAddr, IpAddr, ToSocketAddr}; -use io::{Reader, Writer, IoResult, IoError}; -use kinds::Send; -use boxed::Box; +use io::{Reader, Writer, IoResult}; use option::Option; use result::{Ok, Err}; -use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory}; -use rt::rtio; +use sys::udp::UdpSocket as UdpSocketImp; /// A User Datagram Protocol socket. /// @@ -60,7 +57,7 @@ use rt::rtio; /// } /// ``` pub struct UdpSocket { - obj: Box<RtioUdpSocket + Send>, + inner: UdpSocketImp, } impl UdpSocket { @@ -69,18 +66,15 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn bind<A: ToSocketAddr>(addr: A) -> IoResult<UdpSocket> { - super::with_addresses_io(addr, |io, addr| io.udp_bind(addr).map(|s| UdpSocket { obj: s })) + super::with_addresses(addr, |addr| { + UdpSocketImp::bind(addr).map(|s| UdpSocket { inner: s }) + }) } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recv_from(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { - match self.obj.recv_from(buf) { - Ok((amt, rtio::SocketAddr { ip, port })) => { - Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port })) - } - Err(e) => Err(IoError::from_rtio_error(e)), - } + self.inner.recv_from(buf) } /// Sends data on the socket to the given address. Returns nothing on @@ -89,10 +83,7 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddr` trait. See its /// documentation for concrete examples. pub fn send_to<A: ToSocketAddr>(&mut self, buf: &[u8], addr: A) -> IoResult<()> { - super::with_addresses(addr, |addr| self.obj.send_to(buf, rtio::SocketAddr { - ip: super::to_rtio(addr.ip), - port: addr.port, - }).map_err(IoError::from_rtio_error)) + super::with_addresses(addr, |addr| self.inner.send_to(buf, addr)) } /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer` @@ -112,24 +103,19 @@ impl UdpSocket { /// Returns the socket address that this socket was created from. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - match self.obj.socket_name() { - Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }), - Err(e) => Err(IoError::from_rtio_error(e)) - } + self.inner.socket_name() } /// Joins a multicast IP address (becomes a member of it) #[experimental] pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - let e = self.obj.join_multicast(super::to_rtio(multi)); - e.map_err(IoError::from_rtio_error) + self.inner.join_multicast(multi) } /// Leaves a multicast IP address (drops membership from it) #[experimental] pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - let e = self.obj.leave_multicast(super::to_rtio(multi)); - e.map_err(IoError::from_rtio_error) + self.inner.leave_multicast(multi) } /// Set the multicast loop flag to the specified value @@ -137,33 +123,25 @@ impl UdpSocket { /// This lets multicast packets loop back to local sockets (if enabled) #[experimental] pub fn set_multicast_loop(&mut self, on: bool) -> IoResult<()> { - if on { - self.obj.loop_multicast_locally() - } else { - self.obj.dont_loop_multicast_locally() - }.map_err(IoError::from_rtio_error) + self.inner.set_multicast_loop(on) } /// Sets the multicast TTL #[experimental] pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error) + self.inner.multicast_time_to_live(ttl) } /// Sets this socket's TTL #[experimental] pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error) + self.inner.time_to_live(ttl) } /// Sets the broadcast flag on or off #[experimental] pub fn set_broadcast(&mut self, broadcast: bool) -> IoResult<()> { - if broadcast { - self.obj.hear_broadcasts() - } else { - self.obj.ignore_broadcasts() - }.map_err(IoError::from_rtio_error) + self.inner.set_broadcast(broadcast) } /// Sets the read/write timeout for this socket. @@ -171,7 +149,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_timeout(timeout_ms) + self.inner.set_timeout(timeout_ms) } /// Sets the read timeout for this socket. @@ -179,7 +157,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_read_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_read_timeout(timeout_ms) + self.inner.set_read_timeout(timeout_ms) } /// Sets the write timeout for this socket. @@ -187,7 +165,7 @@ impl UdpSocket { /// For more information, see `TcpStream::set_timeout` #[experimental = "the timeout argument may change in type and value"] pub fn set_write_timeout(&mut self, timeout_ms: Option<u64>) { - self.obj.set_write_timeout(timeout_ms) + self.inner.set_write_timeout(timeout_ms) } } @@ -201,7 +179,7 @@ impl Clone for UdpSocket { /// received, and the second read will receive the second packet. fn clone(&self) -> UdpSocket { UdpSocket { - obj: self.obj.clone(), + inner: self.inner.clone(), } } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index c77cffd561e..64b2518fab1 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -17,15 +17,17 @@ use prelude::*; -use io::{IoResult, IoError}; +use io::IoResult; use libc; -use os; -use rt::rtio::{RtioPipe, LocalIo}; +use sync::Arc; + +use sys_common; +use sys; +use sys::fs::FileDesc as FileDesc; /// A synchronous, in-memory pipe. pub struct PipeStream { - /// The internal, opaque runtime pipe object. - obj: Box<RtioPipe + Send>, + inner: Arc<FileDesc> } pub struct PipePair { @@ -55,14 +57,14 @@ impl PipeStream { /// } /// ``` pub fn open(fd: libc::c_int) -> IoResult<PipeStream> { - LocalIo::maybe_raise(|io| { - io.pipe_open(fd).map(|obj| PipeStream { obj: obj }) - }).map_err(IoError::from_rtio_error) + Ok(PipeStream::from_filedesc(FileDesc::new(fd, true))) } + // FIXME: expose this some other way + /// Wrap a FileDesc directly, taking ownership. #[doc(hidden)] - pub fn new(inner: Box<RtioPipe + Send>) -> PipeStream { - PipeStream { obj: inner } + pub fn from_filedesc(fd: FileDesc) -> PipeStream { + PipeStream { inner: Arc::new(fd) } } /// Creates a pair of in-memory OS pipes for a unidirectional communication @@ -76,43 +78,35 @@ impl PipeStream { /// This function can fail to succeed if the underlying OS has run out of /// available resources to allocate a new pipe. pub fn pair() -> IoResult<PipePair> { - struct Closer { fd: libc::c_int } - - let os::Pipe { reader, writer } = try!(unsafe { os::pipe() }); - let mut reader = Closer { fd: reader }; - let mut writer = Closer { fd: writer }; - - let io_reader = try!(PipeStream::open(reader.fd)); - reader.fd = -1; - let io_writer = try!(PipeStream::open(writer.fd)); - writer.fd = -1; - return Ok(PipePair { reader: io_reader, writer: io_writer }); - - impl Drop for Closer { - fn drop(&mut self) { - if self.fd != -1 { - let _ = unsafe { libc::close(self.fd) }; - } - } - } + let (reader, writer) = try!(unsafe { sys::os::pipe() }); + Ok(PipePair { + reader: PipeStream::from_filedesc(reader), + writer: PipeStream::from_filedesc(writer), + }) + } +} + +impl sys_common::AsFileDesc for PipeStream { + fn as_fd(&self) -> &sys::fs::FileDesc { + &*self.inner } } impl Clone for PipeStream { fn clone(&self) -> PipeStream { - PipeStream { obj: self.obj.clone() } + PipeStream { inner: self.inner.clone() } } } impl Reader for PipeStream { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { - self.obj.read(buf).map_err(IoError::from_rtio_error) + self.inner.read(buf) } } impl Writer for PipeStream { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - self.obj.write(buf).map_err(IoError::from_rtio_error) + self.inner.write(buf) } } |
