diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-06-03 20:09:39 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-06-06 22:19:57 -0700 |
| commit | da2293c6f6ea9291749f51a4608d50585be835f0 (patch) | |
| tree | 94284257df2ec6f351401dd304cdf0f1d1071881 | |
| parent | 5ec36c358f74fe83332231e774ea20a21d165120 (diff) | |
| download | rust-da2293c6f6ea9291749f51a4608d50585be835f0.tar.gz rust-da2293c6f6ea9291749f51a4608d50585be835f0.zip | |
std: Deal with fallout of rtio changes
| -rw-r--r-- | src/libstd/comm/shared.rs | 2 | ||||
| -rw-r--r-- | src/libstd/comm/sync.rs | 2 | ||||
| -rw-r--r-- | src/libstd/io/fs.rs | 145 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 34 | ||||
| -rw-r--r-- | src/libstd/io/net/addrinfo.rs | 31 | ||||
| -rw-r--r-- | src/libstd/io/net/mod.rs | 21 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 75 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 38 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 32 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 12 | ||||
| -rw-r--r-- | src/libstd/io/process.rs | 34 | ||||
| -rw-r--r-- | src/libstd/io/signal.rs | 17 | ||||
| -rw-r--r-- | src/libstd/io/stdio.rs | 100 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 26 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 16 | ||||
| -rw-r--r-- | src/libstd/os.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rtdeps.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 30 | ||||
| -rw-r--r-- | src/libstd/task.rs | 38 | ||||
| -rw-r--r-- | src/libstd/unstable/dynamic_lib.rs | 2 | ||||
| -rw-r--r-- | src/libstd/unstable/mod.rs | 4 |
21 files changed, 453 insertions, 210 deletions
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 3fde584a46f..f4eeebeeea0 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -27,10 +27,10 @@ use option::{Some, None, Option}; use owned::Box; use result::{Ok, Err, Result}; use rt::local::Local; +use rt::mutex::NativeMutex; use rt::task::{Task, BlockedTask}; use rt::thread::Thread; use sync::atomics; -use unstable::mutex::NativeMutex; use mpsc = sync::mpsc_queue; diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs index 819e885526c..7fe505573b7 100644 --- a/src/libstd/comm/sync.rs +++ b/src/libstd/comm/sync.rs @@ -43,10 +43,10 @@ use owned::Box; use ptr::RawPtr; use result::{Result, Ok, Err}; use rt::local::Local; +use rt::mutex::{NativeMutex, LockGuard}; use rt::task::{Task, BlockedTask}; use sync::atomics; use ty::Unsafe; -use unstable::mutex::{NativeMutex, LockGuard}; use vec::Vec; pub struct Packet<T> { diff --git a/src/libstd/io/fs.rs b/src/libstd/io/fs.rs index 96ab8989a21..49e8d379236 100644 --- a/src/libstd/io/fs.rs +++ b/src/libstd/io/fs.rs @@ -52,19 +52,22 @@ fs::unlink(&path); use c_str::ToCStr; use clone::Clone; use container::Container; +use io; use iter::Iterator; use kinds::Send; -use super::{Reader, Writer, Seek}; -use super::{SeekStyle, Read, Write, Open, IoError, Truncate}; -use super::{FileMode, FileAccess, FileStat, IoResult, FilePermission}; -use rt::rtio::{RtioFileStream, IoFactory, LocalIo}; -use io; +use libc; use option::{Some, None, Option}; use owned::Box; -use result::{Ok, Err}; -use path; use path::{Path, GenericPath}; +use path; +use result::{Ok, Err}; +use rt::rtio::{RtioFileStream, IoFactory, LocalIo}; +use rt::rtio; use slice::{OwnedVector, ImmutableVector}; +use super::UnstableFileStat; +use super::{FileMode, FileAccess, FileStat, IoResult, FilePermission}; +use super::{Reader, Writer, Seek, Append, SeekCur, SeekEnd, SeekSet}; +use super::{SeekStyle, Read, Write, ReadWrite, Open, IoError, Truncate}; use vec::Vec; /// Unconstrained file access type that exposes read and write operations @@ -126,6 +129,16 @@ impl File { pub fn open_mode(path: &Path, mode: FileMode, access: FileAccess) -> IoResult<File> { + let mode = match mode { + Open => rtio::Open, + Append => rtio::Append, + Truncate => rtio::Truncate, + }; + let access = match access { + Read => rtio::Read, + Write => rtio::Write, + ReadWrite => rtio::ReadWrite, + }; LocalIo::maybe_raise(|io| { io.fs_open(&path.to_c_str(), mode, access).map(|fd| { File { @@ -134,7 +147,7 @@ impl File { last_nread: -1 } }) - }) + }).map_err(IoError::from_rtio_error) } /// Attempts to open a file in read-only mode. This function is equivalent to @@ -184,7 +197,7 @@ impl File { /// device. This will flush any internal buffers necessary to perform this /// operation. pub fn fsync(&mut self) -> IoResult<()> { - self.fd.fsync() + self.fd.fsync().map_err(IoError::from_rtio_error) } /// This function is similar to `fsync`, except that it may not synchronize @@ -192,7 +205,7 @@ impl File { /// must synchronize content, but don't need the metadata on disk. The goal /// of this method is to reduce disk operations. pub fn datasync(&mut self) -> IoResult<()> { - self.fd.datasync() + self.fd.datasync().map_err(IoError::from_rtio_error) } /// Either truncates or extends the underlying file, updating the size of @@ -204,7 +217,7 @@ impl File { /// will be extended to `size` and have all of the intermediate data filled /// in with 0s. pub fn truncate(&mut self, size: i64) -> IoResult<()> { - self.fd.truncate(size) + self.fd.truncate(size).map_err(IoError::from_rtio_error) } /// Tests whether this stream has reached EOF. @@ -217,7 +230,10 @@ impl File { /// Queries information about the underlying file. pub fn stat(&mut self) -> IoResult<FileStat> { - self.fd.fstat() + match self.fd.fstat() { + Ok(s) => Ok(from_rtio(s)), + Err(e) => Err(IoError::from_rtio_error(e)), + } } } @@ -243,7 +259,9 @@ impl File { /// user lacks permissions to remove the file, or if some other filesystem-level /// error occurs. pub fn unlink(path: &Path) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_unlink(&path.to_c_str())) + LocalIo::maybe_raise(|io| { + io.fs_unlink(&path.to_c_str()) + }).map_err(IoError::from_rtio_error) } /// Given a path, query the file system to get information about a file, @@ -268,9 +286,10 @@ pub fn unlink(path: &Path) -> IoResult<()> { /// to perform a `stat` call on the given path or if there is no entry in the /// filesystem at the provided path. pub fn stat(path: &Path) -> IoResult<FileStat> { - LocalIo::maybe_raise(|io| { - io.fs_stat(&path.to_c_str()) - }) + match LocalIo::maybe_raise(|io| io.fs_stat(&path.to_c_str())) { + Ok(s) => Ok(from_rtio(s)), + Err(e) => Err(IoError::from_rtio_error(e)), + } } /// Perform the same operation as the `stat` function, except that this @@ -282,9 +301,46 @@ pub fn stat(path: &Path) -> IoResult<FileStat> { /// /// See `stat` pub fn lstat(path: &Path) -> IoResult<FileStat> { - LocalIo::maybe_raise(|io| { - io.fs_lstat(&path.to_c_str()) - }) + match LocalIo::maybe_raise(|io| io.fs_lstat(&path.to_c_str())) { + Ok(s) => Ok(from_rtio(s)), + Err(e) => Err(IoError::from_rtio_error(e)), + } +} + +fn from_rtio(s: rtio::FileStat) -> FileStat { + let rtio::FileStat { + size, kind, perm, created, modified, + accessed, device, inode, rdev, + nlink, uid, gid, blksize, blocks, flags, gen + } = s; + + FileStat { + size: size, + kind: match (kind as libc::c_int) & libc::S_IFMT { + libc::S_IFREG => io::TypeFile, + libc::S_IFDIR => io::TypeDirectory, + libc::S_IFIFO => io::TypeNamedPipe, + libc::S_IFBLK => io::TypeBlockSpecial, + libc::S_IFLNK => io::TypeSymlink, + _ => io::TypeUnknown, + }, + perm: FilePermission::from_bits_truncate(perm as u32), + created: created, + modified: modified, + accessed: accessed, + unstable: UnstableFileStat { + device: device, + inode: inode, + rdev: rdev, + nlink: nlink, + uid: uid, + gid: gid, + blksize: blksize, + blocks: blocks, + flags: flags, + gen: gen, + }, + } } /// Rename a file or directory to a new name. @@ -304,7 +360,9 @@ pub fn lstat(path: &Path) -> IoResult<FileStat> { /// permissions to view the contents, or if some other intermittent I/O error /// occurs. pub fn rename(from: &Path, to: &Path) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_rename(&from.to_c_str(), &to.to_c_str())) + LocalIo::maybe_raise(|io| { + io.fs_rename(&from.to_c_str(), &to.to_c_str()) + }).map_err(IoError::from_rtio_error) } /// Copies the contents of one file to another. This function will also @@ -382,25 +440,33 @@ pub fn copy(from: &Path, to: &Path) -> IoResult<()> { /// Some possible error situations are not having the permission to /// change the attributes of a file or the file not existing. pub fn chmod(path: &Path, mode: io::FilePermission) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_chmod(&path.to_c_str(), mode)) + LocalIo::maybe_raise(|io| { + io.fs_chmod(&path.to_c_str(), mode.bits() as uint) + }).map_err(IoError::from_rtio_error) } /// Change the user and group owners of a file at the specified path. pub fn chown(path: &Path, uid: int, gid: int) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_chown(&path.to_c_str(), uid, gid)) + LocalIo::maybe_raise(|io| { + io.fs_chown(&path.to_c_str(), uid, gid) + }).map_err(IoError::from_rtio_error) } /// Creates a new hard link on the filesystem. The `dst` path will be a /// link pointing to the `src` path. Note that systems often require these /// two paths to both be located on the same filesystem. pub fn link(src: &Path, dst: &Path) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_link(&src.to_c_str(), &dst.to_c_str())) + LocalIo::maybe_raise(|io| { + io.fs_link(&src.to_c_str(), &dst.to_c_str()) + }).map_err(IoError::from_rtio_error) } /// Creates a new symbolic link on the filesystem. The `dst` path will be a /// symlink pointing to the `src` path. pub fn symlink(src: &Path, dst: &Path) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_symlink(&src.to_c_str(), &dst.to_c_str())) + LocalIo::maybe_raise(|io| { + io.fs_symlink(&src.to_c_str(), &dst.to_c_str()) + }).map_err(IoError::from_rtio_error) } /// Reads a symlink, returning the file that the symlink points to. @@ -412,7 +478,7 @@ pub fn symlink(src: &Path, dst: &Path) -> IoResult<()> { pub fn readlink(path: &Path) -> IoResult<Path> { LocalIo::maybe_raise(|io| { Ok(Path::new(try!(io.fs_readlink(&path.to_c_str())))) - }) + }).map_err(IoError::from_rtio_error) } /// Create a new, empty directory at the provided path @@ -433,7 +499,9 @@ pub fn readlink(path: &Path) -> IoResult<Path> { /// This call will return an error if the user lacks permissions to make a new /// directory at the provided path, or if the directory already exists. pub fn mkdir(path: &Path, mode: FilePermission) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_mkdir(&path.to_c_str(), mode)) + LocalIo::maybe_raise(|io| { + io.fs_mkdir(&path.to_c_str(), mode.bits() as uint) + }).map_err(IoError::from_rtio_error) } /// Remove an existing, empty directory @@ -453,7 +521,9 @@ pub fn mkdir(path: &Path, mode: FilePermission) -> IoResult<()> { /// This call will return an error if the user lacks permissions to remove the /// directory at the provided path, or if the directory isn't empty. pub fn rmdir(path: &Path) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_rmdir(&path.to_c_str())) + LocalIo::maybe_raise(|io| { + io.fs_rmdir(&path.to_c_str()) + }).map_err(IoError::from_rtio_error) } /// Retrieve a vector containing all entries within a provided directory @@ -492,7 +562,7 @@ pub fn readdir(path: &Path) -> IoResult<Vec<Path>> { Ok(try!(io.fs_readdir(&path.to_c_str(), 0)).move_iter().map(|a| { Path::new(a) }).collect()) - }) + }).map_err(IoError::from_rtio_error) } /// Returns an iterator which will recursively walk the directory structure @@ -616,7 +686,9 @@ pub fn rmdir_recursive(path: &Path) -> IoResult<()> { /// be in milliseconds. // FIXME(#10301) these arguments should not be u64 pub fn change_file_times(path: &Path, atime: u64, mtime: u64) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.fs_utime(&path.to_c_str(), atime, mtime)) + LocalIo::maybe_raise(|io| { + io.fs_utime(&path.to_c_str(), atime, mtime) + }).map_err(IoError::from_rtio_error) } impl Reader for File { @@ -629,28 +701,35 @@ impl Reader for File { _ => Ok(read as uint) } }, - Err(e) => Err(e), + Err(e) => Err(IoError::from_rtio_error(e)), } } } impl Writer for File { - fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.fd.write(buf) } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.fd.write(buf).map_err(IoError::from_rtio_error) + } } impl Seek for File { fn tell(&self) -> IoResult<u64> { - self.fd.tell() + self.fd.tell().map_err(IoError::from_rtio_error) } fn seek(&mut self, pos: i64, style: SeekStyle) -> IoResult<()> { + let style = match style { + SeekSet => rtio::SeekSet, + SeekCur => rtio::SeekCur, + SeekEnd => rtio::SeekEnd, + }; match self.fd.seek(pos, style) { Ok(_) => { // successful seek resets EOF indicator self.last_nread = -1; Ok(()) } - Err(e) => Err(e), + Err(e) => Err(IoError::from_rtio_error(e)), } } } diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index 78700d353af..94d60cb3ce7 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -225,6 +225,7 @@ use option::{Option, Some, None}; use os; use owned::Box; use result::{Ok, Err, Result}; +use rt::rtio; use slice::{Vector, MutableVector, ImmutableVector}; use str::{StrSlice, StrAllocating}; use str; @@ -323,6 +324,14 @@ impl IoError { libc::ERROR_BROKEN_PIPE => (EndOfFile, "the pipe has ended"), libc::ERROR_OPERATION_ABORTED => (TimedOut, "operation timed out"), + libc::WSAEINVAL => (InvalidInput, "invalid argument"), + libc::ERROR_CALL_NOT_IMPLEMENTED => + (IoUnavailable, "function not implemented"), + libc::ERROR_CALL_NOT_IMPLEMENTED => + (MismatchedFileTypeForOperation, + "invalid handle provided to function"), + libc::ERROR_NOTHING_TO_TERMINATE => + (InvalidInput, "no process to kill"), // libuv maps this error code to EISDIR. we do too. if it is found // to be incorrect, we can add in some more machinery to only @@ -351,9 +360,17 @@ impl IoError { libc::EADDRINUSE => (ConnectionRefused, "address in use"), libc::ENOENT => (FileNotFound, "no such file or directory"), libc::EISDIR => (InvalidInput, "illegal operation on a directory"), - - // These two constants can have the same value on some systems, but - // different values on others, so we can't use a match clause + libc::ENOSYS => (IoUnavailable, "function not implemented"), + libc::EINVAL => (InvalidInput, "invalid argument"), + libc::ENOTTY => + (MismatchedFileTypeForOperation, + "file descriptor is not a TTY"), + libc::ETIMEDOUT => (TimedOut, "operation timed out"), + libc::ECANCELED => (TimedOut, "operation aborted"), + + // These two constants can have the same value on some systems, + // but different values on others, so we can't use a match + // clause x if x == libc::EAGAIN || x == libc::EWOULDBLOCK => (ResourceUnavailable, "resource temporarily unavailable"), @@ -382,6 +399,17 @@ impl IoError { pub fn last_error() -> IoError { IoError::from_errno(os::errno() as uint, true) } + + fn from_rtio_error(err: rtio::IoError) -> IoError { + let rtio::IoError { code, extra, detail } = err; + let mut ioerr = IoError::from_errno(code, false); + ioerr.detail = detail; + ioerr.kind = match ioerr.kind { + TimedOut if extra > 0 => ShortWrite(extra), + k => k, + }; + return ioerr; + } } impl fmt::Show for IoError { diff --git a/src/libstd/io/net/addrinfo.rs b/src/libstd/io/net/addrinfo.rs index 879c66e0769..8d5fd2b99fd 100644 --- a/src/libstd/io/net/addrinfo.rs +++ b/src/libstd/io/net/addrinfo.rs @@ -20,10 +20,12 @@ getaddrinfo() #![allow(missing_doc)] use iter::Iterator; -use io::IoResult; +use io::{IoResult, IoError}; use io::net::ip::{SocketAddr, IpAddr}; use option::{Option, Some, None}; +use result::{Ok, Err}; use rt::rtio::{IoFactory, LocalIo}; +use rt::rtio; use vec::Vec; /// Hints to the types of sockets that are desired when looking up hosts @@ -89,9 +91,34 @@ pub fn get_host_addresses(host: &str) -> IoResult<Vec<IpAddr>> { /// /// FIXME: this is not public because the `Hint` structure is not ready for public /// consumption just yet. +#[allow(unused_variable)] fn lookup(hostname: Option<&str>, servname: Option<&str>, hint: Option<Hint>) -> IoResult<Vec<Info>> { - LocalIo::maybe_raise(|io| io.get_host_addresses(hostname, servname, hint)) + let hint = hint.map(|Hint { family, socktype, protocol, flags }| { + rtio::AddrinfoHint { + family: family, + socktype: 0, // FIXME: this should use the above variable + protocol: 0, // FIXME: this should use the above variable + flags: flags, + } + }); + match LocalIo::maybe_raise(|io| { + io.get_host_addresses(hostname, servname, hint) + }) { + Ok(v) => Ok(v.move_iter().map(|info| { + Info { + address: SocketAddr { + ip: super::from_rtio(info.address.ip), + port: info.address.port, + }, + family: info.family, + socktype: None, // FIXME: this should use the above variable + protocol: None, // FIXME: this should use the above variable + flags: info.flags, + } + }).collect()), + Err(e) => Err(IoError::from_rtio_error(e)), + } } // Ignored on android since we cannot give tcp/ip diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index 1939d653752..54af83462ee 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -10,6 +10,9 @@ //! Networking I/O +use rt::rtio; +use self::ip::{Ipv4Addr, Ipv6Addr, IpAddr}; + pub use self::addrinfo::get_host_addresses; pub mod addrinfo; @@ -18,3 +21,21 @@ pub mod udp; pub mod ip; // FIXME(#12093) - this should not be called unix pub mod unix; + +fn to_rtio(ip: IpAddr) -> rtio::IpAddr { + match ip { + Ipv4Addr(a, b, c, d) => rtio::Ipv4Addr(a, b, c, d), + Ipv6Addr(a, b, c, d, e, f, g, h) => { + rtio::Ipv6Addr(a, b, c, d, e, f, g, h) + } + } +} + +fn from_rtio(ip: rtio::IpAddr) -> IpAddr { + match ip { + rtio::Ipv4Addr(a, b, c, d) => Ipv4Addr(a, b, c, d), + rtio::Ipv6Addr(a, b, c, d, e, f, g, h) => { + Ipv6Addr(a, b, c, d, e, f, g, h) + } + } +} diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index ac17bc1de13..8ce77faa296 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -32,6 +32,7 @@ use option::{None, Some, Option}; use owned::Box; use rt::rtio::{IoFactory, LocalIo, RtioSocket, RtioTcpListener}; use rt::rtio::{RtioTcpAcceptor, RtioTcpStream}; +use rt::rtio; /// A structure which represents a TCP stream between a local socket and a /// remote socket. @@ -67,22 +68,22 @@ impl TcpStream { Some(addr) => vec!(addr), None => try!(get_host_addresses(host)) }; - let mut err = IoError{ + let mut err = IoError { kind: ConnectionFailed, desc: "no addresses found for hostname", detail: None }; - for address in addresses.iter() { - let socket_addr = SocketAddr{ip: *address, port: port}; + for addr in addresses.iter() { + let addr = rtio::SocketAddr{ ip: super::to_rtio(*addr), port: port }; let result = LocalIo::maybe_raise(|io| { - io.tcp_connect(socket_addr, None).map(TcpStream::new) + io.tcp_connect(addr, None).map(TcpStream::new) }); match result { Ok(stream) => { return Ok(stream) } Err(connect_err) => { - err = connect_err + err = IoError::from_rtio_error(connect_err) } } } @@ -101,19 +102,31 @@ impl TcpStream { #[experimental = "the timeout argument may eventually change types"] pub fn connect_timeout(addr: SocketAddr, timeout_ms: u64) -> IoResult<TcpStream> { + let SocketAddr { ip, port } = addr; + let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port }; LocalIo::maybe_raise(|io| { io.tcp_connect(addr, Some(timeout_ms)).map(TcpStream::new) - }) + }).map_err(IoError::from_rtio_error) } /// Returns the socket address of the remote peer of this TCP connection. pub fn peer_name(&mut self) -> IoResult<SocketAddr> { - self.obj.peer_name() + match self.obj.peer_name() { + Ok(rtio::SocketAddr { ip, port }) => { + Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) + } + Err(e) => Err(IoError::from_rtio_error(e)), + } } /// Returns the socket address of the local half of this TCP connection. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - self.obj.socket_name() + match self.obj.socket_name() { + Ok(rtio::SocketAddr { ip, port }) => { + Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) + } + Err(e) => Err(IoError::from_rtio_error(e)), + } } /// Sets the nodelay flag on this connection to the boolean specified @@ -123,7 +136,7 @@ impl TcpStream { self.obj.nodelay() } else { self.obj.control_congestion() - } + }.map_err(IoError::from_rtio_error) } /// Sets the keepalive timeout to the timeout specified. @@ -136,7 +149,7 @@ impl TcpStream { match delay_in_seconds { Some(i) => self.obj.keepalive(i), None => self.obj.letdie(), - } + }.map_err(IoError::from_rtio_error) } /// Closes the reading half of this connection. @@ -168,7 +181,9 @@ impl TcpStream { /// /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. - pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + pub fn close_read(&mut self) -> IoResult<()> { + self.obj.close_read().map_err(IoError::from_rtio_error) + } /// Closes the writing half of this connection. /// @@ -177,7 +192,9 @@ impl TcpStream { /// /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. - pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + pub fn close_write(&mut self) -> IoResult<()> { + self.obj.close_write().map_err(IoError::from_rtio_error) + } /// Sets a timeout, in milliseconds, for blocking operations on this stream. /// @@ -261,11 +278,15 @@ impl Clone for TcpStream { } impl Reader for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + self.obj.read(buf).map_err(IoError::from_rtio_error) + } } impl Writer for TcpStream { - fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.obj.write(buf).map_err(IoError::from_rtio_error) + } } /// A structure representing a socket server. This listener is used to create a @@ -319,10 +340,13 @@ impl TcpListener { pub fn bind(addr: &str, port: u16) -> IoResult<TcpListener> { match FromStr::from_str(addr) { Some(ip) => { - let socket_addr = SocketAddr{ip: ip, port: port}; + let addr = rtio::SocketAddr{ + ip: super::to_rtio(ip), + port: port, + }; LocalIo::maybe_raise(|io| { - io.tcp_bind(socket_addr).map(|l| TcpListener { obj: l }) - }) + io.tcp_bind(addr).map(|l| TcpListener { obj: l }) + }).map_err(IoError::from_rtio_error) } None => { Err(IoError{ @@ -336,13 +360,21 @@ impl TcpListener { /// Returns the local socket address of this listener. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - self.obj.socket_name() + match self.obj.socket_name() { + Ok(rtio::SocketAddr { ip, port }) => { + Ok(SocketAddr { ip: super::from_rtio(ip), port: port }) + } + Err(e) => Err(IoError::from_rtio_error(e)), + } } } impl Listener<TcpStream, TcpAcceptor> for TcpListener { fn listen(self) -> IoResult<TcpAcceptor> { - self.obj.listen().map(|acceptor| TcpAcceptor { obj: acceptor }) + match self.obj.listen() { + Ok(acceptor) => Ok(TcpAcceptor { obj: acceptor }), + Err(e) => Err(IoError::from_rtio_error(e)), + } } } @@ -403,7 +435,10 @@ impl TcpAcceptor { impl Acceptor<TcpStream> for TcpAcceptor { fn accept(&mut self) -> IoResult<TcpStream> { - self.obj.accept().map(TcpStream::new) + match self.obj.accept(){ + Ok(s) => Ok(TcpStream::new(s)), + Err(e) => Err(IoError::from_rtio_error(e)), + } } } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 875dd01be82..538bba36958 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -17,12 +17,13 @@ use clone::Clone; use io::net::ip::{SocketAddr, IpAddr}; -use io::{Reader, Writer, IoResult}; +use io::{Reader, Writer, IoResult, IoError}; use kinds::Send; use owned::Box; use option::Option; use result::{Ok, Err}; use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, LocalIo}; +use rt::rtio; /// A User Datagram Protocol socket. /// @@ -62,22 +63,32 @@ pub struct UdpSocket { impl UdpSocket { /// Creates a UDP socket from the given socket address. pub fn bind(addr: SocketAddr) -> IoResult<UdpSocket> { + let SocketAddr { ip, port } = addr; LocalIo::maybe_raise(|io| { + let addr = rtio::SocketAddr { ip: super::to_rtio(ip), port: port }; io.udp_bind(addr).map(|s| UdpSocket { obj: s }) - }) + }).map_err(IoError::from_rtio_error) } /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, SocketAddr)> { - self.obj.recvfrom(buf) + match self.obj.recvfrom(buf) { + Ok((amt, rtio::SocketAddr { ip, port })) => { + Ok((amt, SocketAddr { ip: super::from_rtio(ip), port: port })) + } + Err(e) => Err(IoError::from_rtio_error(e)), + } } /// Sends data on the socket to the given address. Returns nothing on /// success. pub fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> IoResult<()> { - self.obj.sendto(buf, dst) + self.obj.sendto(buf, rtio::SocketAddr { + ip: super::to_rtio(dst.ip), + port: dst.port, + }).map_err(IoError::from_rtio_error) } /// Creates a `UdpStream`, which allows use of the `Reader` and `Writer` @@ -95,19 +106,24 @@ impl UdpSocket { /// Returns the socket address that this socket was created from. pub fn socket_name(&mut self) -> IoResult<SocketAddr> { - self.obj.socket_name() + match self.obj.socket_name() { + Ok(a) => Ok(SocketAddr { ip: super::from_rtio(a.ip), port: a.port }), + Err(e) => Err(IoError::from_rtio_error(e)) + } } /// Joins a multicast IP address (becomes a member of it) #[experimental] pub fn join_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - self.obj.join_multicast(multi) + let e = self.obj.join_multicast(super::to_rtio(multi)); + e.map_err(IoError::from_rtio_error) } /// Leaves a multicast IP address (drops membership from it) #[experimental] pub fn leave_multicast(&mut self, multi: IpAddr) -> IoResult<()> { - self.obj.leave_multicast(multi) + let e = self.obj.leave_multicast(super::to_rtio(multi)); + e.map_err(IoError::from_rtio_error) } /// Set the multicast loop flag to the specified value @@ -119,19 +135,19 @@ impl UdpSocket { self.obj.loop_multicast_locally() } else { self.obj.dont_loop_multicast_locally() - } + }.map_err(IoError::from_rtio_error) } /// Sets the multicast TTL #[experimental] pub fn set_multicast_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.multicast_time_to_live(ttl) + self.obj.multicast_time_to_live(ttl).map_err(IoError::from_rtio_error) } /// Sets this socket's TTL #[experimental] pub fn set_ttl(&mut self, ttl: int) -> IoResult<()> { - self.obj.time_to_live(ttl) + self.obj.time_to_live(ttl).map_err(IoError::from_rtio_error) } /// Sets the broadcast flag on or off @@ -141,7 +157,7 @@ impl UdpSocket { self.obj.hear_broadcasts() } else { self.obj.ignore_broadcasts() - } + }.map_err(IoError::from_rtio_error) } /// Sets the read/write timeout for this socket. diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 1e320fe1aae..9715a821e4f 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -28,7 +28,7 @@ use prelude::*; use c_str::ToCStr; use clone::Clone; -use io::{Listener, Acceptor, Reader, Writer, IoResult}; +use io::{Listener, Acceptor, Reader, Writer, IoResult, IoError}; use kinds::Send; use owned::Box; use rt::rtio::{IoFactory, LocalIo, RtioUnixListener}; @@ -58,7 +58,7 @@ impl UnixStream { pub fn connect<P: ToCStr>(path: &P) -> IoResult<UnixStream> { LocalIo::maybe_raise(|io| { io.unix_connect(&path.to_c_str(), None).map(|p| UnixStream { obj: p }) - }) + }).map_err(IoError::from_rtio_error) } /// Connect to a pipe named by `path`, timing out if the specified number of @@ -72,7 +72,7 @@ impl UnixStream { LocalIo::maybe_raise(|io| { let s = io.unix_connect(&path.to_c_str(), Some(timeout_ms)); s.map(|p| UnixStream { obj: p }) - }) + }).map_err(IoError::from_rtio_error) } @@ -83,7 +83,9 @@ impl UnixStream { /// /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. - pub fn close_read(&mut self) -> IoResult<()> { self.obj.close_read() } + pub fn close_read(&mut self) -> IoResult<()> { + self.obj.close_read().map_err(IoError::from_rtio_error) + } /// Closes the writing half of this connection. /// @@ -92,7 +94,9 @@ impl UnixStream { /// /// Note that this method affects all cloned handles associated with this /// stream, not just this one handle. - pub fn close_write(&mut self) -> IoResult<()> { self.obj.close_write() } + pub fn close_write(&mut self) -> IoResult<()> { + self.obj.close_write().map_err(IoError::from_rtio_error) + } /// Sets the read/write timeout for this socket. /// @@ -126,11 +130,15 @@ impl Clone for UnixStream { } impl Reader for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + self.obj.read(buf).map_err(IoError::from_rtio_error) + } } impl Writer for UnixStream { - fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.obj.write(buf).map_err(IoError::from_rtio_error) + } } /// A value that can listen for incoming named pipe connection requests. @@ -165,13 +173,15 @@ impl UnixListener { pub fn bind<P: ToCStr>(path: &P) -> IoResult<UnixListener> { LocalIo::maybe_raise(|io| { io.unix_bind(&path.to_c_str()).map(|s| UnixListener { obj: s }) - }) + }).map_err(IoError::from_rtio_error) } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { fn listen(self) -> IoResult<UnixAcceptor> { - self.obj.listen().map(|obj| UnixAcceptor { obj: obj }) + self.obj.listen().map(|obj| { + UnixAcceptor { obj: obj } + }).map_err(IoError::from_rtio_error) } } @@ -202,7 +212,9 @@ impl UnixAcceptor { impl Acceptor<UnixStream> for UnixAcceptor { fn accept(&mut self) -> IoResult<UnixStream> { - self.obj.accept().map(|s| UnixStream { obj: s }) + self.obj.accept().map(|s| { + UnixStream { obj: s } + }).map_err(IoError::from_rtio_error) } } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index fbb0d5bc8d8..11bb27573c2 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -16,7 +16,7 @@ #![allow(missing_doc)] use prelude::*; -use io::IoResult; +use io::{IoResult, IoError}; use libc; use owned::Box; use rt::rtio::{RtioPipe, LocalIo}; @@ -51,7 +51,7 @@ impl PipeStream { pub fn open(fd: libc::c_int) -> IoResult<PipeStream> { LocalIo::maybe_raise(|io| { io.pipe_open(fd).map(|obj| PipeStream { obj: obj }) - }) + }).map_err(IoError::from_rtio_error) } #[doc(hidden)] @@ -67,11 +67,15 @@ impl Clone for PipeStream { } impl Reader for PipeStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { self.obj.read(buf) } + fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { + self.obj.read(buf).map_err(IoError::from_rtio_error) + } } impl Writer for PipeStream { - fn write(&mut self, buf: &[u8]) -> IoResult<()> { self.obj.write(buf) } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + self.obj.write(buf).map_err(IoError::from_rtio_error) + } } #[cfg(test)] diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index 1ed46593562..059286339a6 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -16,12 +16,13 @@ use prelude::*; use str; use fmt; -use io::IoResult; +use io::{IoResult, IoError}; use io; use libc; use mem; use owned::Box; use rt::rtio::{RtioProcess, ProcessConfig, IoFactory, LocalIo}; +use rt::rtio; use c_str::CString; /// Signal a process to exit, without forcibly killing it. Corresponds to @@ -232,16 +233,25 @@ impl Command { /// Executes the command as a child process, which is returned. pub fn spawn(&self) -> IoResult<Process> { + fn to_rtio(p: StdioContainer) -> rtio::StdioContainer { + match p { + Ignored => rtio::Ignored, + InheritFd(fd) => rtio::InheritFd(fd), + CreatePipe(a, b) => rtio::CreatePipe(a, b), + } + } + let extra_io: Vec<rtio::StdioContainer> = + self.extra_io.iter().map(|x| to_rtio(*x)).collect(); LocalIo::maybe_raise(|io| { let cfg = ProcessConfig { program: &self.program, args: self.args.as_slice(), env: self.env.as_ref().map(|env| env.as_slice()), cwd: self.cwd.as_ref(), - stdin: self.stdin, - stdout: self.stdout, - stderr: self.stderr, - extra_io: self.extra_io.as_slice(), + stdin: to_rtio(self.stdin), + stdout: to_rtio(self.stdout), + stderr: to_rtio(self.stderr), + extra_io: extra_io.as_slice(), uid: self.uid, gid: self.gid, detach: self.detach, @@ -258,7 +268,7 @@ impl Command { extra_io: io.collect(), } }) - }) + }).map_err(IoError::from_rtio_error) } /// Executes the command as a child process, waiting for it to finish and @@ -393,7 +403,9 @@ impl Process { /// be successfully delivered if the child has exited, but not yet been /// reaped. pub fn kill(id: libc::pid_t, signal: int) -> IoResult<()> { - LocalIo::maybe_raise(|io| io.kill(id, signal)) + LocalIo::maybe_raise(|io| { + io.kill(id, signal) + }).map_err(IoError::from_rtio_error) } /// Returns the process id of this child process @@ -415,7 +427,7 @@ impl Process { /// /// If the signal delivery fails, the corresponding error is returned. pub fn signal(&mut self, signal: int) -> IoResult<()> { - self.handle.kill(signal) + self.handle.kill(signal).map_err(IoError::from_rtio_error) } /// Sends a signal to this child requesting that it exits. This is @@ -442,7 +454,11 @@ impl Process { /// `set_timeout` and the timeout expires before the child exits. pub fn wait(&mut self) -> IoResult<ProcessExit> { drop(self.stdin.take()); - self.handle.wait() + match self.handle.wait() { + Ok(rtio::ExitSignal(s)) => Ok(ExitSignal(s)), + Ok(rtio::ExitStatus(s)) => Ok(ExitStatus(s)), + Err(e) => Err(IoError::from_rtio_error(e)), + } } /// Sets a timeout, in milliseconds, for future calls to wait(). diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 05392baff04..598a8667d41 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -28,7 +28,7 @@ use mem::drop; use option::{Some, None}; use owned::Box; use result::{Ok, Err}; -use rt::rtio::{IoFactory, LocalIo, RtioSignal}; +use rt::rtio::{IoFactory, LocalIo, RtioSignal, Callback}; use slice::ImmutableVector; use vec::Vec; @@ -122,17 +122,28 @@ impl Listener { /// If this function fails to register a signal handler, then an error will /// be returned. pub fn register(&mut self, signum: Signum) -> io::IoResult<()> { + struct SignalCallback { + signum: Signum, + tx: Sender<Signum>, + } + impl Callback for SignalCallback { + fn call(&mut self) { self.tx.send(self.signum) } + } + if self.handles.iter().any(|&(sig, _)| sig == signum) { return Ok(()); // self is already listening to signum, so succeed } match LocalIo::maybe_raise(|io| { - io.signal(signum, self.tx.clone()) + io.signal(signum as int, box SignalCallback { + signum: signum, + tx: self.tx.clone(), + }) }) { Ok(handle) => { self.handles.push((signum, handle)); Ok(()) } - Err(e) => Err(e) + Err(e) => Err(io::IoError::from_rtio_error(e)) } } diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index 6de4c6316d1..5db09076c98 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -27,20 +27,19 @@ out.write(bytes!("Hello, world!")); */ +use failure::local_stderr; use fmt; use io::{Reader, Writer, IoResult, IoError, OtherIoError, standard_error, EndOfFile, LineBufferedWriter, BufferedReader}; -use libc; use kinds::Send; -use mem::replace; +use libc; use option::{Option, Some, None}; use owned::Box; -use prelude::drop; use result::{Ok, Err}; use rt; use rt::local::Local; -use rt::rtio::{DontClose, IoFactory, LocalIo, RtioFileStream, RtioTTY}; use rt::task::Task; +use rt::rtio::{DontClose, IoFactory, LocalIo, RtioFileStream, RtioTTY}; use str::StrSlice; // And so begins the tale of acquiring a uv handle to a stdio stream on all @@ -82,9 +81,11 @@ fn src<T>(fd: libc::c_int, readable: bool, f: |StdSource| -> T) -> T { Ok(tty) => f(TTY(tty)), Err(_) => f(File(io.fs_from_raw_fd(fd, DontClose))), }) - }).unwrap() + }).map_err(IoError::from_rtio_error).unwrap() } +local_data_key!(local_stdout: Box<Writer:Send>) + /// Creates a new non-blocking handle to the stdin of the current process. /// /// The returned handled is buffered by default with a `BufferedReader`. If @@ -154,22 +155,6 @@ pub fn stderr_raw() -> StdWriter { src(libc::STDERR_FILENO, false, |src| StdWriter { inner: src }) } -fn reset_helper(w: Box<Writer:Send>, - f: |&mut Task, Box<Writer:Send>| -> Option<Box<Writer:Send>>) - -> Option<Box<Writer:Send>> { - let mut t = Local::borrow(None::<Task>); - // Be sure to flush any pending output from the writer - match f(&mut *t, w) { - Some(mut w) => { - drop(t); - // FIXME: is failing right here? - w.flush().unwrap(); - Some(w) - } - None => None - } -} - /// Resets the task-local stdout handle to the specified writer /// /// This will replace the current task's stdout handle, returning the old @@ -179,7 +164,10 @@ fn reset_helper(w: Box<Writer:Send>, /// Note that this does not need to be called for all new tasks; the default /// output handle is to the process's stdout stream. pub fn set_stdout(stdout: Box<Writer:Send>) -> Option<Box<Writer:Send>> { - reset_helper(stdout, |t, w| replace(&mut t.stdout, Some(w))) + local_stdout.replace(Some(stdout)).and_then(|mut s| { + let _ = s.flush(); + Some(s) + }) } /// Resets the task-local stderr handle to the specified writer @@ -191,7 +179,10 @@ pub fn set_stdout(stdout: Box<Writer:Send>) -> Option<Box<Writer:Send>> { /// Note that this does not need to be called for all new tasks; the default /// output handle is to the process's stderr stream. pub fn set_stderr(stderr: Box<Writer:Send>) -> Option<Box<Writer:Send>> { - reset_helper(stderr, |t, w| replace(&mut t.stderr, Some(w))) + local_stderr.replace(Some(stderr)).and_then(|mut s| { + let _ = s.flush(); + Some(s) + }) } // Helper to access the local task's stdout handle @@ -204,42 +195,18 @@ pub fn set_stderr(stderr: Box<Writer:Send>) -> Option<Box<Writer:Send>> { // // io1 aliases io2 // }) // }) -fn with_task_stdout(f: |&mut Writer| -> IoResult<()> ) { - let task: Option<Box<Task>> = Local::try_take(); - let result = match task { - Some(mut task) => { - // Printing may run arbitrary code, so ensure that the task is in - // TLS to allow all std services. Note that this means a print while - // printing won't use the task's normal stdout handle, but this is - // necessary to ensure safety (no aliasing). - let mut my_stdout = task.stdout.take(); - Local::put(task); - - if my_stdout.is_none() { - my_stdout = Some(box stdout() as Box<Writer:Send>); - } - let ret = f(*my_stdout.get_mut_ref()); - - // Note that we need to be careful when putting the stdout handle - // back into the task. If the handle was set to `Some` while - // printing, then we can run aribitrary code when destroying the - // previous handle. This means that the local task needs to be in - // TLS while we do this. - // - // To protect against this, we do a little dance in which we - // temporarily take the task, swap the handles, put the task in TLS, - // and only then drop the previous handle. - let prev = replace(&mut Local::borrow(None::<Task>).stdout, my_stdout); - drop(prev); - ret - } - - None => { - let mut io = rt::Stdout; - f(&mut io as &mut Writer) - } +fn with_task_stdout(f: |&mut Writer| -> IoResult<()>) { + let result = if Local::exists(None::<Task>) { + let mut my_stdout = local_stdout.replace(None).unwrap_or_else(|| { + box stdout() as Box<Writer:Send> + }); + let result = f(my_stdout); + local_stdout.replace(Some(my_stdout)); + result + } else { + let mut io = rt::Stdout; + f(&mut io as &mut Writer) }; - match result { Ok(()) => {} Err(e) => fail!("failed printing to stdout: {}", e), @@ -311,7 +278,7 @@ impl Reader for StdReader { tty.read(buf) }, File(ref mut file) => file.read(buf).map(|i| i as uint), - }; + }.map_err(IoError::from_rtio_error); match ret { // When reading a piped stdin, libuv will return 0-length reads when // stdin reaches EOF. For pretty much all other streams it will @@ -342,7 +309,9 @@ impl StdWriter { /// connected to a TTY instance, or if querying the TTY instance fails. pub fn winsize(&mut self) -> IoResult<(int, int)> { match self.inner { - TTY(ref mut tty) => tty.get_winsize(), + TTY(ref mut tty) => { + tty.get_winsize().map_err(IoError::from_rtio_error) + } File(..) => { Err(IoError { kind: OtherIoError, @@ -362,7 +331,9 @@ impl StdWriter { /// connected to a TTY instance, or if querying the TTY instance fails. pub fn set_raw(&mut self, raw: bool) -> IoResult<()> { match self.inner { - TTY(ref mut tty) => tty.set_raw(raw), + TTY(ref mut tty) => { + tty.set_raw(raw).map_err(IoError::from_rtio_error) + } File(..) => { Err(IoError { kind: OtherIoError, @@ -387,7 +358,7 @@ impl Writer for StdWriter { match self.inner { TTY(ref mut tty) => tty.write(buf), File(ref mut file) => file.write(buf), - } + }.map_err(IoError::from_rtio_error) } } @@ -413,12 +384,13 @@ mod tests { }) iotest!(fn capture_stderr() { - use io::{ChanReader, ChanWriter}; + use realstd::comm::channel; + use realstd::io::{Writer, ChanReader, ChanWriter, Reader}; let (tx, rx) = channel(); let (mut r, w) = (ChanReader::new(rx), ChanWriter::new(tx)); spawn(proc() { - set_stderr(box w); + ::realstd::io::stdio::set_stderr(box w); fail!("my special message"); }); let s = r.read_to_str().unwrap(); diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index d7476dd2de8..78b8e55c651 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -17,11 +17,11 @@ and create receivers which will receive notifications after a period of time. */ -use comm::Receiver; -use io::IoResult; +use comm::{Receiver, Sender, channel}; +use io::{IoResult, IoError}; use kinds::Send; use owned::Box; -use rt::rtio::{IoFactory, LocalIo, RtioTimer}; +use rt::rtio::{IoFactory, LocalIo, RtioTimer, Callback}; /// A synchronous timer object /// @@ -67,6 +67,8 @@ pub struct Timer { obj: Box<RtioTimer:Send>, } +struct TimerCallback { tx: Sender<()> } + /// Sleep the current task for `msecs` milliseconds. pub fn sleep(msecs: u64) { let timer = Timer::new(); @@ -80,7 +82,9 @@ impl Timer { /// for a number of milliseconds, or to possibly create channels which will /// get notified after an amount of time has passed. pub fn new() -> IoResult<Timer> { - LocalIo::maybe_raise(|io| io.timer_init().map(|t| Timer { obj: t })) + LocalIo::maybe_raise(|io| { + io.timer_init().map(|t| Timer { obj: t }) + }).map_err(IoError::from_rtio_error) } /// Blocks the current task for `msecs` milliseconds. @@ -99,7 +103,9 @@ impl Timer { /// by this timer, and that the returned receiver will be invalidated once /// the timer is destroyed (when it falls out of scope). pub fn oneshot(&mut self, msecs: u64) -> Receiver<()> { - self.obj.oneshot(msecs) + let (tx, rx) = channel(); + self.obj.oneshot(msecs, box TimerCallback { tx: tx }); + return rx } /// Creates a receiver which will have a continuous stream of notifications @@ -112,7 +118,15 @@ impl Timer { /// by this timer, and that the returned receiver will be invalidated once /// the timer is destroyed (when it falls out of scope). pub fn periodic(&mut self, msecs: u64) -> Receiver<()> { - self.obj.period(msecs) + let (tx, rx) = channel(); + self.obj.period(msecs, box TimerCallback { tx: tx }); + return rx + } +} + +impl Callback for TimerCallback { + fn call(&mut self) { + let _ = self.tx.send_opt(()); } } diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index 109832b7c47..fe5fabef9d9 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -103,8 +103,8 @@ html_favicon_url = "http://www.rust-lang.org/favicon.ico", html_root_url = "http://doc.rust-lang.org/", html_playground_url = "http://play.rust-lang.org/")] -#![feature(macro_rules, globs, asm, managed_boxes, thread_local, link_args, - linkage, default_type_params, phase, concat_idents, quad_precision_float)] +#![feature(macro_rules, globs, managed_boxes, + linkage, default_type_params, phase)] // Don't link to std. We are std. #![no_std] @@ -123,9 +123,10 @@ extern crate alloc; extern crate core; -extern crate libc; -extern crate core_rand = "rand"; extern crate core_collections = "collections"; +extern crate core_rand = "rand"; +extern crate libc; +extern crate rustrt; // Make std testable by not duplicating lang items. See #2912 #[cfg(test)] extern crate realstd = "std"; @@ -168,6 +169,9 @@ pub use core_collections::str; pub use core_collections::string; pub use core_collections::vec; +pub use rustrt::c_str; +pub use rustrt::local_data; + // Run tests with libgreen instead of libnative. // // FIXME: This egregiously hacks around starting the test runner in a different @@ -231,19 +235,16 @@ pub mod collections; pub mod task; pub mod comm; -pub mod local_data; pub mod sync; /* Runtime and platform support */ -pub mod c_str; pub mod c_vec; pub mod os; pub mod io; pub mod path; pub mod fmt; -pub mod cleanup; // Private APIs #[unstable] @@ -253,6 +254,7 @@ pub mod unstable; // but name resolution doesn't work without it being pub. #[unstable] pub mod rt; +mod failure; #[doc(hidden)] pub fn issue_14344_workaround() { // FIXME #14344 force linkage to happen correctly diff --git a/src/libstd/os.rs b/src/libstd/os.rs index 381ebc08200..9a7e061c472 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -189,7 +189,7 @@ Accessing environment variables is not generally threadsafe. Serialize access through a global lock. */ fn with_env_lock<T>(f: || -> T) -> T { - use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT; diff --git a/src/libstd/rtdeps.rs b/src/libstd/rtdeps.rs index f954bcabe5a..c804918ae4b 100644 --- a/src/libstd/rtdeps.rs +++ b/src/libstd/rtdeps.rs @@ -13,7 +13,7 @@ //! necessary for running libstd. // All platforms need to link to rustrt -#[link(name = "rustrt", kind = "static")] +#[link(name = "rust_builtin", kind = "static")] extern {} // LLVM implements the `frem` instruction as a call to `fmod`, which lives in diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs index ea4c12f4401..39e420685ab 100644 --- a/src/libstd/sync/deque.rs +++ b/src/libstd/sync/deque.rs @@ -63,7 +63,7 @@ use ptr; use rt::heap::{allocate, deallocate}; use slice::ImmutableVector; use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use unstable::sync::Exclusive; +use rt::exclusive::Exclusive; use vec::Vec; // Once the queue is less than 1/K full, then it will be downsized. Note that @@ -121,7 +121,7 @@ pub enum Stolen<T> { /// will only use this structure when allocating a new buffer or deallocating a /// previous one. pub struct BufferPool<T> { - pool: Exclusive<Vec<Box<Buffer<T>>>>, + pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>, } /// An internal buffer used by the chase-lev deque. This structure is actually @@ -148,7 +148,7 @@ impl<T: Send> BufferPool<T> { /// Allocates a new buffer pool which in turn can be used to allocate new /// deques. pub fn new() -> BufferPool<T> { - BufferPool { pool: Exclusive::new(vec!()) } + BufferPool { pool: Arc::new(Exclusive::new(vec!())) } } /// Allocates a new work-stealing deque which will send/receiving memory to @@ -162,25 +162,21 @@ impl<T: Send> BufferPool<T> { fn alloc(&self, bits: int) -> Box<Buffer<T>> { unsafe { - self.pool.with(|pool| { - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i).unwrap(), - None => box Buffer::new(bits) - } - }) + let mut pool = self.pool.lock(); + match pool.iter().position(|x| x.size() >= (1 << bits)) { + Some(i) => pool.remove(i).unwrap(), + None => box Buffer::new(bits) + } } } fn free(&self, buf: Box<Buffer<T>>) { unsafe { - let mut buf = Some(buf); - self.pool.with(|pool| { - let buf = buf.take_unwrap(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - }) + let mut pool = self.pool.lock(); + match pool.iter().position(|v| v.size() > buf.size()) { + Some(i) => pool.insert(i, buf), + None => pool.push(buf), + } } } } diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 3b573b87574..55ebba69d90 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -38,12 +38,13 @@ use any::Any; use comm::{Sender, Receiver, channel}; -use io::Writer; +use io::{Writer, stdio}; use kinds::{Send, marker}; use option::{None, Some, Option}; use owned::Box; use result::{Result, Ok, Err}; use rt::local::Local; +use rt::task; use rt::task::Task; use str::{Str, SendStr, IntoMaybeOwned}; @@ -53,18 +54,10 @@ use str::{Str, SendStr, IntoMaybeOwned}; #[cfg(test)] use str::StrAllocating; #[cfg(test)] use string::String; -/// Indicates the manner in which a task exited. -/// -/// A task that completes without failing is considered to exit successfully. -/// -/// If you wish for this result's delivery to block until all -/// children tasks complete, recommend using a result future. -pub type TaskResult = Result<(), Box<Any:Send>>; - /// Task configuration options pub struct TaskOpts { /// Enable lifecycle notifications on the given channel - pub notify_chan: Option<Sender<TaskResult>>, + pub notify_chan: Option<Sender<task::Result>>, /// A name for the task-to-be, for identification in failure messages pub name: Option<SendStr>, /// The size of the stack for the spawned task @@ -114,7 +107,7 @@ impl TaskBuilder { /// /// # Failure /// Fails if a future_result was already set for this task. - pub fn future_result(&mut self) -> Receiver<TaskResult> { + pub fn future_result(&mut self) -> Receiver<task::Result> { // FIXME (#3725): Once linked failure and notification are // handled in the library, I can imagine implementing this by just // registering an arbitrary number of task::on_exit handlers and @@ -180,7 +173,28 @@ impl TaskBuilder { Some(t) => t, None => fail!("need a local task to spawn a new task"), }; - t.spawn_sibling(self.opts, f); + let TaskOpts { notify_chan, name, stack_size, stdout, stderr } = self.opts; + + let opts = task::TaskOpts { + on_exit: notify_chan.map(|c| proc(r) c.send(r)), + name: name, + stack_size: stack_size, + }; + if stdout.is_some() || stderr.is_some() { + t.spawn_sibling(opts, proc() { + match stdout { + Some(handle) => { let _ = stdio::set_stdout(handle); } + None => {} + } + match stderr { + Some(handle) => { let _ = stdio::set_stderr(handle); } + None => {} + } + f(); + }); + } else { + t.spawn_sibling(opts, f); + } } /** diff --git a/src/libstd/unstable/dynamic_lib.rs b/src/libstd/unstable/dynamic_lib.rs index 6c406a7c847..c05cdc85cc5 100644 --- a/src/libstd/unstable/dynamic_lib.rs +++ b/src/libstd/unstable/dynamic_lib.rs @@ -224,7 +224,7 @@ pub mod dl { } pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> { - use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; + use rt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT; unsafe { // dlerror isn't thread safe, so we need to lock around this entire diff --git a/src/libstd/unstable/mod.rs b/src/libstd/unstable/mod.rs index d8de6463fab..985ef2e142c 100644 --- a/src/libstd/unstable/mod.rs +++ b/src/libstd/unstable/mod.rs @@ -11,7 +11,3 @@ #![doc(hidden)] pub mod dynamic_lib; - -pub mod sync; -pub mod mutex; - |
