diff options
| author | bors <bors@rust-lang.org> | 2013-10-24 14:26:15 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-10-24 14:26:15 -0700 |
| commit | 3f5b2219cc893b30863f9136703166f306fcc684 (patch) | |
| tree | d7267619b1909f2deaf319c560a64d667d141d35 /src/libstd/rt | |
| parent | 61f8c059c4c6082683d78b2ee3d963f65fa1eb98 (diff) | |
| parent | 188e471339dfe652b8ff9f3bbe4cc262a040c584 (diff) | |
| download | rust-3f5b2219cc893b30863f9136703166f306fcc684.tar.gz rust-3f5b2219cc893b30863f9136703166f306fcc684.zip | |
auto merge of #9901 : alexcrichton/rust/unix-sockets, r=brson
Large topics: * Implemented `rt::io::net::unix`. We've got an implementation backed by "named pipes" for windows for free from libuv, so I'm not sure if these should be `cfg(unix)` or whether they'd be better placed in `rt::io::pipe` (which is currently kinda useless), or to leave in `unix`. Regardless, we probably shouldn't deny windows of functionality which it certainly has. * Fully implemented `net::addrinfo`, or at least fully implemented in the sense of making the best attempt to wrap libuv's `getaddrinfo` api * Moved standard I/O to a libuv TTY instead of just a plain old file descriptor. I found that this interacted better when closing stdin, and it has the added bonus of getting things like terminal dimentions (someone should make a progress bar now!) * Migrate to `~Trait` instead of a typedef'd object where possible. There are only two more types which are blocked on this, and those are traits which have a method which takes by-value self (there's an open issue on this) * Drop `rt::io::support::PathLike` in favor of just `ToCStr`. We recently had a lot of Path work done, but it still wasn't getting passed down to libuv (there was an intermediate string conversion), and this allows true paths to work all the way down to libuv (and anything else that can become a C string). * Removes `extra::fileinput` and `extra::io_util` Closes #9895 Closes #9975 Closes #8330 Closes #6850 (ported lots of libraries away from std::io) cc #4248 (implemented unix/dns) cc #9128 (made everything truly trait objects)
Diffstat (limited to 'src/libstd/rt')
38 files changed, 2415 insertions, 1064 deletions
diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs index 99634b532b0..4b16f0bc0e1 100644 --- a/src/libstd/rt/io/extensions.rs +++ b/src/libstd/rt/io/extensions.rs @@ -18,11 +18,10 @@ use int; use iter::Iterator; use vec; use rt::io::{Reader, Writer, Decorator}; -use rt::io::{read_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE}; +use rt::io::{io_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE}; use option::{Option, Some, None}; use unstable::finally::Finally; use cast; -use io::{u64_to_le_bytes, u64_to_be_bytes}; pub trait ReaderUtil { @@ -41,8 +40,8 @@ pub trait ReaderUtil { /// /// # Failure /// - /// Raises the same conditions as `read`. Additionally raises `read_error` - /// on EOF. If `read_error` is handled then `push_bytes` may push less + /// Raises the same conditions as `read`. Additionally raises `io_error` + /// on EOF. If `io_error` is handled then `push_bytes` may push less /// than the requested number of bytes. fn push_bytes(&mut self, buf: &mut ~[u8], len: uint); @@ -50,8 +49,8 @@ pub trait ReaderUtil { /// /// # Failure /// - /// Raises the same conditions as `read`. Additionally raises `read_error` - /// on EOF. If `read_error` is handled then the returned vector may + /// Raises the same conditions as `read`. Additionally raises `io_error` + /// on EOF. If `io_error` is handled then the returned vector may /// contain less than the requested number of bytes. fn read_bytes(&mut self, len: uint) -> ~[u8]; @@ -314,7 +313,7 @@ impl<T: Reader> ReaderUtil for T { total_read += nread; } None => { - read_error::cond.raise(standard_error(EndOfFile)); + io_error::cond.raise(standard_error(EndOfFile)); break; } } @@ -334,11 +333,11 @@ impl<T: Reader> ReaderUtil for T { fn read_to_end(&mut self) -> ~[u8] { let mut buf = vec::with_capacity(DEFAULT_BUF_SIZE); let mut keep_reading = true; - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if e.kind == EndOfFile { keep_reading = false; } else { - read_error::cond.raise(e) + io_error::cond.raise(e) } }).inside { while keep_reading { @@ -634,6 +633,88 @@ fn extend_sign(val: u64, nbytes: uint) -> i64 { (val << shift) as i64 >> shift } +pub fn u64_to_le_bytes<T>(n: u64, size: uint, + f: &fn(v: &[u8]) -> T) -> T { + assert!(size <= 8u); + match size { + 1u => f(&[n as u8]), + 2u => f(&[n as u8, + (n >> 8) as u8]), + 4u => f(&[n as u8, + (n >> 8) as u8, + (n >> 16) as u8, + (n >> 24) as u8]), + 8u => f(&[n as u8, + (n >> 8) as u8, + (n >> 16) as u8, + (n >> 24) as u8, + (n >> 32) as u8, + (n >> 40) as u8, + (n >> 48) as u8, + (n >> 56) as u8]), + _ => { + + let mut bytes: ~[u8] = ~[]; + let mut i = size; + let mut n = n; + while i > 0u { + bytes.push((n & 255_u64) as u8); + n >>= 8_u64; + i -= 1u; + } + f(bytes) + } + } +} + +pub fn u64_to_be_bytes<T>(n: u64, size: uint, + f: &fn(v: &[u8]) -> T) -> T { + assert!(size <= 8u); + match size { + 1u => f(&[n as u8]), + 2u => f(&[(n >> 8) as u8, + n as u8]), + 4u => f(&[(n >> 24) as u8, + (n >> 16) as u8, + (n >> 8) as u8, + n as u8]), + 8u => f(&[(n >> 56) as u8, + (n >> 48) as u8, + (n >> 40) as u8, + (n >> 32) as u8, + (n >> 24) as u8, + (n >> 16) as u8, + (n >> 8) as u8, + n as u8]), + _ => { + let mut bytes: ~[u8] = ~[]; + let mut i = size; + while i > 0u { + let shift = ((i - 1u) * 8u) as u64; + bytes.push((n >> shift) as u8); + i -= 1u; + } + f(bytes) + } + } +} + +pub fn u64_from_be_bytes(data: &[u8], + start: uint, + size: uint) + -> u64 { + let mut sz = size; + assert!((sz <= 8u)); + let mut val = 0_u64; + let mut pos = start; + while sz > 0u { + sz -= 1u; + val += (data[pos] as u64) << ((sz * 8u) as u64); + pos += 1u; + } + return val; +} + #[cfg(test)] mod test { use super::ReaderUtil; @@ -641,7 +722,7 @@ mod test { use cell::Cell; use rt::io::mem::{MemReader, MemWriter}; use rt::io::mock::MockReader; - use rt::io::{read_error, placeholder_error}; + use rt::io::{io_error, placeholder_error}; #[test] fn read_byte() { @@ -681,10 +762,10 @@ mod test { fn read_byte_error() { let mut reader = MockReader::new(); reader.read = |_| { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None }; - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { let byte = reader.read_byte(); assert!(byte == None); @@ -722,11 +803,11 @@ mod test { fn bytes_error() { let mut reader = MockReader::new(); reader.read = |_| { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None }; let mut it = reader.bytes(); - do read_error::cond.trap(|_| ()).inside { + do io_error::cond.trap(|_| ()).inside { let byte = it.next(); assert!(byte == None); } @@ -765,7 +846,7 @@ mod test { #[test] fn read_bytes_eof() { let mut reader = MemReader::new(~[10, 11]); - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { assert!(reader.read_bytes(4) == ~[10, 11]); } @@ -806,7 +887,7 @@ mod test { fn push_bytes_eof() { let mut reader = MemReader::new(~[10, 11]); let mut buf = ~[8, 9]; - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { reader.push_bytes(&mut buf, 4); assert!(buf == ~[8, 9, 10, 11]); @@ -824,13 +905,13 @@ mod test { buf[0] = 10; Some(1) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } }; let mut buf = ~[8, 9]; - do read_error::cond.trap(|_| { } ).inside { + do io_error::cond.trap(|_| { } ).inside { reader.push_bytes(&mut buf, 4); } assert!(buf == ~[8, 9, 10]); @@ -850,7 +931,7 @@ mod test { buf[0] = 10; Some(1) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } @@ -903,7 +984,7 @@ mod test { buf[1] = 11; Some(2) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs index a43bcd8142e..d035e2f457c 100644 --- a/src/libstd/rt/io/file.rs +++ b/src/libstd/rt/io/file.rs @@ -15,10 +15,11 @@ with regular files & directories on a filesystem. At the top-level of the module are a set of freestanding functions, associated with various filesystem operations. They all operate -on a `PathLike` object. +on a `ToCStr` object. This trait is already defined for common +objects such as strings and `Path` instances. All operations in this module, including those as part of `FileStream` et al -block the task during execution. Most will raise `std::rt::io::{io_error,read_error}` +block the task during execution. Most will raise `std::rt::io::io_error` conditions in the event of failure. Also included in this module are the `FileInfo` and `DirectoryInfo` traits. When @@ -30,15 +31,14 @@ free function counterparts. */ use prelude::*; -use super::support::PathLike; +use c_str::ToCStr; use super::{Reader, Writer, Seek}; use super::{SeekStyle, Read, Write}; -use rt::rtio::{RtioFileStream, IoFactory, IoFactoryObject}; -use rt::io::{io_error, read_error, EndOfFile, +use rt::rtio::{RtioFileStream, IoFactory, with_local_io}; +use rt::io::{io_error, EndOfFile, FileMode, FileAccess, FileStat, IoError, PathAlreadyExists, PathDoesntExist, MismatchedFileTypeForOperation, ignore_io_error}; -use rt::local::Local; use option::{Some, None}; use path::Path; @@ -48,7 +48,6 @@ use path::Path; /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::open; /// use std::rt::io::{FileMode, FileAccess}; /// @@ -87,22 +86,20 @@ use path::Path; /// * Attempting to open a file with a `FileAccess` that the user lacks permissions /// for /// * Filesystem-level errors (full disk, etc) -pub fn open<P: PathLike>(path: &P, - mode: FileMode, - access: FileAccess - ) -> Option<FileStream> { - let open_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_open(path, mode, access) - }; - match open_result { - Ok(fd) => Some(FileStream { - fd: fd, - last_nread: -1 - }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn open<P: ToCStr>(path: &P, + mode: FileMode, + access: FileAccess + ) -> Option<FileStream> { + do with_local_io |io| { + match io.fs_open(&path.to_c_str(), mode, access) { + Ok(fd) => Some(FileStream { + fd: fd, + last_nread: -1 + }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -113,7 +110,6 @@ pub fn open<P: PathLike>(path: &P, /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::unlink; /// /// let p = &Path("/some/file/path.txt"); @@ -129,17 +125,16 @@ pub fn open<P: PathLike>(path: &P, /// /// This function will raise an `io_error` condition if the user lacks permissions to /// remove the file or if some other filesystem-level error occurs -pub fn unlink<P: PathLike>(path: &P) { - let unlink_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_unlink(path) - }; - match unlink_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn unlink<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_unlink(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Create a new, empty directory at the provided path @@ -148,7 +143,6 @@ pub fn unlink<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::mkdir; /// /// let p = &Path("/some/dir"); @@ -159,17 +153,16 @@ pub fn unlink<P: PathLike>(path: &P) { /// /// This call will raise an `io_error` condition if the user lacks permissions to make a /// new directory at the provided path, or if the directory already exists -pub fn mkdir<P: PathLike>(path: &P) { - let mkdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_mkdir(path) - }; - match mkdir_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn mkdir<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_mkdir(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Remove an existing, empty directory @@ -178,7 +171,6 @@ pub fn mkdir<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::rmdir; /// /// let p = &Path("/some/dir"); @@ -189,23 +181,22 @@ pub fn mkdir<P: PathLike>(path: &P) { /// /// This call will raise an `io_error` condition if the user lacks permissions to remove the /// directory at the provided path, or if the directory isn't empty -pub fn rmdir<P: PathLike>(path: &P) { - let rmdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_rmdir(path) - }; - match rmdir_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn rmdir<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_rmdir(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Get information on the file, directory, etc at the provided path /// -/// Given a `rt::io::support::PathLike`, query the file system to get -/// information about a file, directory, etc. +/// Given a path, query the file system to get information about a file, +/// directory, etc. /// /// Returns a `Some(std::rt::io::PathInfo)` on success /// @@ -213,7 +204,6 @@ pub fn rmdir<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::stat; /// /// let p = &Path("/some/file/path.txt"); @@ -238,18 +228,14 @@ pub fn rmdir<P: PathLike>(path: &P) { /// This call will raise an `io_error` condition if the user lacks the requisite /// permissions to perform a `stat` call on the given path or if there is no /// entry in the filesystem at the provided path. -pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { - let open_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_stat(path) - }; - match open_result { - Ok(p) => { - Some(p) - }, - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn stat<P: ToCStr>(path: &P) -> Option<FileStat> { + do with_local_io |io| { + match io.fs_stat(&path.to_c_str()) { + Ok(p) => Some(p), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -260,7 +246,6 @@ pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::readdir; /// /// fn visit_dirs(dir: &Path, cb: &fn(&Path)) { @@ -279,18 +264,14 @@ pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { /// Will raise an `io_error` condition if the provided `path` doesn't exist, /// the process lacks permissions to view the contents or if the `path` points /// at a non-directory file -pub fn readdir<P: PathLike>(path: &P) -> Option<~[Path]> { - let readdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_readdir(path, 0) - }; - match readdir_result { - Ok(p) => { - Some(p) - }, - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn readdir<P: ToCStr>(path: &P) -> Option<~[Path]> { + do with_local_io |io| { + match io.fs_readdir(&path.to_c_str(), 0) { + Ok(p) => Some(p), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -380,7 +361,7 @@ impl Reader for FileStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } @@ -407,7 +388,7 @@ impl Writer for FileStream { match self.fd.flush() { Ok(_) => (), Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } } } @@ -420,7 +401,7 @@ impl Seek for FileStream { match res { Ok(cursor) => cursor, Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); return -1; } } @@ -434,7 +415,7 @@ impl Seek for FileStream { () }, Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } } } diff --git a/src/libstd/rt/io/mem.rs b/src/libstd/rt/io/mem.rs index 5f6b4398c22..0ec37cd3c07 100644 --- a/src/libstd/rt/io/mem.rs +++ b/src/libstd/rt/io/mem.rs @@ -22,46 +22,66 @@ use vec; /// Writes to an owned, growable byte vector pub struct MemWriter { - priv buf: ~[u8] + priv buf: ~[u8], + priv pos: uint, } impl MemWriter { - pub fn new() -> MemWriter { MemWriter { buf: vec::with_capacity(128) } } + pub fn new() -> MemWriter { + MemWriter { buf: vec::with_capacity(128), pos: 0 } + } } impl Writer for MemWriter { fn write(&mut self, buf: &[u8]) { - self.buf.push_all(buf) + // Make sure the internal buffer is as least as big as where we + // currently are + let difference = self.pos as i64 - self.buf.len() as i64; + if difference > 0 { + self.buf.grow(difference as uint, &0); + } + + // Figure out what bytes will be used to overwrite what's currently + // there (left), and what will be appended on the end (right) + let cap = self.buf.len() - self.pos; + let (left, right) = if cap <= buf.len() { + (buf.slice_to(cap), buf.slice_from(cap)) + } else { + (buf, &[]) + }; + + // Do the necessary writes + if left.len() > 0 { + vec::bytes::copy_memory(self.buf.mut_slice_from(self.pos), + left, left.len()); + } + if right.len() > 0 { + self.buf.push_all(right); + } + + // Bump us forward + self.pos += buf.len(); } fn flush(&mut self) { /* no-op */ } } impl Seek for MemWriter { - fn tell(&self) -> u64 { self.buf.len() as u64 } - - fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() } -} - -impl Decorator<~[u8]> for MemWriter { - - fn inner(self) -> ~[u8] { - match self { - MemWriter { buf: buf } => buf - } - } + fn tell(&self) -> u64 { self.pos as u64 } - fn inner_ref<'a>(&'a self) -> &'a ~[u8] { - match *self { - MemWriter { buf: ref buf } => buf + fn seek(&mut self, pos: i64, style: SeekStyle) { + match style { + SeekSet => { self.pos = pos as uint; } + SeekEnd => { self.pos = self.buf.len() + pos as uint; } + SeekCur => { self.pos += pos as uint; } } } +} - fn inner_mut_ref<'a>(&'a mut self) -> &'a mut ~[u8] { - match *self { - MemWriter { buf: ref mut buf } => buf - } - } +impl Decorator<~[u8]> for MemWriter { + fn inner(self) -> ~[u8] { self.buf } + fn inner_ref<'a>(&'a self) -> &'a ~[u8] { &self.buf } + fn inner_mut_ref<'a>(&'a mut self) -> &'a mut ~[u8] { &mut self.buf } } /// Reads from an owned byte vector @@ -208,6 +228,7 @@ pub fn with_mem_writer(writeFn:&fn(&mut MemWriter)) -> ~[u8] { mod test { use prelude::*; use super::*; + use rt::io::*; #[test] fn test_mem_writer() { @@ -218,7 +239,24 @@ mod test { writer.write([1, 2, 3]); writer.write([4, 5, 6, 7]); assert_eq!(writer.tell(), 8); - assert_eq!(writer.inner(), ~[0, 1, 2, 3, 4, 5 , 6, 7]); + assert_eq!(*writer.inner_ref(), ~[0, 1, 2, 3, 4, 5, 6, 7]); + + writer.seek(0, SeekSet); + assert_eq!(writer.tell(), 0); + writer.write([3, 4]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 3, 4, 5, 6, 7]); + + writer.seek(1, SeekCur); + writer.write([0, 1]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 7]); + + writer.seek(-1, SeekEnd); + writer.write([1, 2]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 1, 2]); + + writer.seek(1, SeekEnd); + writer.write([1]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 1, 2, 0, 1]); } #[test] diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index a80c1aab398..758c9779165 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -261,7 +261,6 @@ pub use self::net::tcp::TcpListener; pub use self::net::tcp::TcpStream; pub use self::net::udp::UdpStream; pub use self::pipe::PipeStream; -pub use self::pipe::UnboundPipeStream; pub use self::process::Process; // Some extension traits that all Readers and Writers get. @@ -299,10 +298,6 @@ pub mod comm_adapters; /// Extension traits pub mod extensions; -/// Non-I/O things needed by the I/O module -// XXX: shouldn this really be pub? -pub mod support; - /// Basic Timer pub mod timer; @@ -331,9 +326,11 @@ pub mod native { /// Mock implementations for testing mod mock; +/// Signal handling +pub mod signal; + /// The default buffer size for various I/O operations -/// XXX: Not pub -pub static DEFAULT_BUF_SIZE: uint = 1024 * 64; +static DEFAULT_BUF_SIZE: uint = 1024 * 64; /// The type passed to I/O condition handlers to indicate error /// @@ -375,7 +372,9 @@ pub enum IoErrorKind { BrokenPipe, PathAlreadyExists, PathDoesntExist, - MismatchedFileTypeForOperation + MismatchedFileTypeForOperation, + ResourceUnavailable, + IoUnavailable, } // FIXME: #8242 implementing manually because deriving doesn't work for some reason @@ -395,7 +394,9 @@ impl ToStr for IoErrorKind { BrokenPipe => ~"BrokenPipe", PathAlreadyExists => ~"PathAlreadyExists", PathDoesntExist => ~"PathDoesntExist", - MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation" + MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation", + IoUnavailable => ~"IoUnavailable", + ResourceUnavailable => ~"ResourceUnavailable", } } } @@ -406,12 +407,6 @@ condition! { pub io_error: IoError -> (); } -// XXX: Can't put doc comments on macros -// Raised by `read` on error -condition! { - pub read_error: IoError -> (); -} - /// Helper for wrapper calls where you want to /// ignore any io_errors that might be raised pub fn ignore_io_error<T>(cb: &fn() -> T) -> T { @@ -431,7 +426,7 @@ pub trait Reader { /// /// # Failure /// - /// Raises the `read_error` condition on error. If the condition + /// Raises the `io_error` condition on error. If the condition /// is handled then no guarantee is made about the number of bytes /// read and the contents of `buf`. If the condition is handled /// returns `None` (XXX see below). diff --git a/src/libstd/rt/io/native/file.rs b/src/libstd/rt/io/native/file.rs index d6820981181..ba819df071a 100644 --- a/src/libstd/rt/io/native/file.rs +++ b/src/libstd/rt/io/native/file.rs @@ -17,13 +17,31 @@ use os; use prelude::*; use super::super::*; -fn raise_error() { +#[cfg(windows)] +fn get_err(errno: i32) -> (IoErrorKind, &'static str) { + match errno { + libc::EOF => (EndOfFile, "end of file"), + _ => (OtherIoError, "unknown error"), + } +} + +#[cfg(not(windows))] +fn get_err(errno: i32) -> (IoErrorKind, &'static str) { // XXX: this should probably be a bit more descriptive... - let (kind, desc) = match os::errno() as i32 { + match errno { libc::EOF => (EndOfFile, "end of file"), + + // These two constants can have the same value on some systems, but + // different values on others, so we can't use a match clause + x if x == libc::EAGAIN || x == libc::EWOULDBLOCK => + (ResourceUnavailable, "resource temporarily unavailable"), + _ => (OtherIoError, "unknown error"), - }; + } +} +fn raise_error() { + let (kind, desc) = get_err(os::errno() as i32); io_error::cond.raise(IoError { kind: kind, desc: desc, diff --git a/src/libstd/rt/io/net/addrinfo.rs b/src/libstd/rt/io/net/addrinfo.rs new file mode 100644 index 00000000000..27cf9781c9c --- /dev/null +++ b/src/libstd/rt/io/net/addrinfo.rs @@ -0,0 +1,126 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Synchronous DNS Resolution + +Contains the functionality to perform DNS resolution in a style related to +getaddrinfo() + +*/ + +use option::{Option, Some, None}; +use result::{Ok, Err}; +use rt::io::{io_error}; +use rt::io::net::ip::{SocketAddr, IpAddr}; +use rt::rtio::{IoFactory, with_local_io}; + +/// Hints to the types of sockets that are desired when looking up hosts +pub enum SocketType { + Stream, Datagram, Raw +} + +/// Flags which can be or'd into the `flags` field of a `Hint`. These are used +/// to manipulate how a query is performed. +/// +/// The meaning of each of these flags can be found with `man -s 3 getaddrinfo` +pub enum Flag { + AddrConfig, + All, + CanonName, + NumericHost, + NumericServ, + Passive, + V4Mapped, +} + +/// A transport protocol associated with either a hint or a return value of +/// `lookup` +pub enum Protocol { + TCP, UDP +} + +/// This structure is used to provide hints when fetching addresses for a +/// remote host to control how the lookup is performed. +/// +/// For details on these fields, see their corresponding definitions via +/// `man -s 3 getaddrinfo` +pub struct Hint { + family: uint, + socktype: Option<SocketType>, + protocol: Option<Protocol>, + flags: uint, +} + +pub struct Info { + address: SocketAddr, + family: uint, + socktype: Option<SocketType>, + protocol: Option<Protocol>, + flags: uint, +} + +/// Easy name resolution. Given a hostname, returns the list of IP addresses for +/// that hostname. +/// +/// # Failure +/// +/// On failure, this will raise on the `io_error` condition. +pub fn get_host_addresses(host: &str) -> Option<~[IpAddr]> { + lookup(Some(host), None, None).map(|a| a.map(|i| i.address.ip)) +} + +/// Full-fleged resolution. This function will perform a synchronous call to +/// getaddrinfo, controlled by the parameters +/// +/// # Arguments +/// +/// * hostname - an optional hostname to lookup against +/// * servname - an optional service name, listed in the system services +/// * hint - see the hint structure, and "man -s 3 getaddrinfo", for how this +/// controls lookup +/// +/// # Failure +/// +/// On failure, this will raise on the `io_error` condition. +/// +/// XXX: this is not public because the `Hint` structure is not ready for public +/// consumption just yet. +fn lookup(hostname: Option<&str>, servname: Option<&str>, + hint: Option<Hint>) -> Option<~[Info]> { + do with_local_io |io| { + match io.get_host_addresses(hostname, servname, hint) { + Ok(i) => Some(i), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +#[cfg(test)] +mod test { + use option::Some; + use rt::io::net::ip::Ipv4Addr; + use super::*; + + #[test] + fn dns_smoke_test() { + let ipaddrs = get_host_addresses("localhost").unwrap(); + let mut found_local = false; + let local_addr = &Ipv4Addr(127, 0, 0, 1); + for addr in ipaddrs.iter() { + found_local = found_local || addr == local_addr; + } + assert!(found_local); + } +} diff --git a/src/libstd/rt/io/net/mod.rs b/src/libstd/rt/io/net/mod.rs index f44e879a63a..cf109167089 100644 --- a/src/libstd/rt/io/net/mod.rs +++ b/src/libstd/rt/io/net/mod.rs @@ -8,55 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::{Option, Some, None}; -use result::{Ok, Err}; -use rt::io::io_error; -use rt::io::net::ip::IpAddr; -use rt::rtio::{IoFactory, IoFactoryObject}; -use rt::local::Local; +pub use self::addrinfo::get_host_addresses; +pub mod addrinfo; pub mod tcp; pub mod udp; pub mod ip; #[cfg(unix)] pub mod unix; - -/// Simplistic name resolution -pub fn get_host_addresses(host: &str) -> Option<~[IpAddr]> { - /*! - * Get the IP addresses for a given host name. - * - * Raises io_error on failure. - */ - - let ipaddrs = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).get_host_addresses(host) - }; - - match ipaddrs { - Ok(i) => Some(i), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None - } - } -} - -#[cfg(test)] -mod test { - use option::Some; - use rt::io::net::ip::Ipv4Addr; - use super::*; - - #[test] - fn dns_smoke_test() { - let ipaddrs = get_host_addresses("localhost").unwrap(); - let mut found_local = false; - let local_addr = &Ipv4Addr(127, 0, 0, 1); - for addr in ipaddrs.iter() { - found_local = found_local || addr == local_addr; - } - assert!(found_local); - } -} diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index f29e17cfc2f..4e841b36a5d 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -12,37 +12,27 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer, Listener, Acceptor}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, - RtioTcpListener, RtioTcpListenerObject, - RtioTcpAcceptor, RtioTcpAcceptorObject, - RtioTcpStream, RtioTcpStreamObject}; -use rt::local::Local; +use rt::io::{io_error, EndOfFile}; +use rt::rtio::{IoFactory, with_local_io, + RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream}; pub struct TcpStream { - priv obj: ~RtioTcpStreamObject + priv obj: ~RtioTcpStream } impl TcpStream { - fn new(s: ~RtioTcpStreamObject) -> TcpStream { + fn new(s: ~RtioTcpStream) -> TcpStream { TcpStream { obj: s } } pub fn connect(addr: SocketAddr) -> Option<TcpStream> { - let stream = unsafe { - rtdebug!("borrowing io to connect"); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - rtdebug!("about to connect"); - (*io).tcp_connect(addr) - }; - - match stream { - Ok(s) => Some(TcpStream::new(s)), - Err(ioerr) => { - rtdebug!("failed to connect: {:?}", ioerr); - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.tcp_connect(addr) { + Ok(s) => Some(TcpStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -77,7 +67,7 @@ impl Reader for TcpStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } @@ -99,20 +89,18 @@ impl Writer for TcpStream { } pub struct TcpListener { - priv obj: ~RtioTcpListenerObject + priv obj: ~RtioTcpListener } impl TcpListener { pub fn bind(addr: SocketAddr) -> Option<TcpListener> { - let listener = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).tcp_bind(addr) - }; - match listener { - Ok(l) => Some(TcpListener { obj: l }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - return None; + do with_local_io |io| { + match io.tcp_bind(addr) { + Ok(l) => Some(TcpListener { obj: l }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -142,7 +130,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { } pub struct TcpAcceptor { - priv obj: ~RtioTcpAcceptorObject + priv obj: ~RtioTcpAcceptor } impl Acceptor<TcpStream> for TcpAcceptor { @@ -320,7 +308,7 @@ mod test { let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if cfg!(windows) { assert_eq!(e.kind, NotConnected); } else { @@ -355,7 +343,7 @@ mod test { let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if cfg!(windows) { assert_eq!(e.kind, NotConnected); } else { diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 27faae0838b..2e4ae95d98e 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -12,25 +12,22 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{RtioSocket, RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject}; -use rt::local::Local; +use rt::io::{io_error, EndOfFile}; +use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, with_local_io}; pub struct UdpSocket { - priv obj: ~RtioUdpSocketObject + priv obj: ~RtioUdpSocket } impl UdpSocket { pub fn bind(addr: SocketAddr) -> Option<UdpSocket> { - let socket = unsafe { - let factory: *mut IoFactoryObject = Local::unsafe_borrow(); - (*factory).udp_bind(addr) - }; - match socket { - Ok(s) => Some(UdpSocket { obj: s }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.udp_bind(addr) { + Ok(s) => Some(UdpSocket { obj: s }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -41,7 +38,7 @@ impl UdpSocket { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } None } diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index 1771a963ba7..e424956e2ff 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -8,44 +8,289 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +/*! + +Named pipes + +This module contains the ability to communicate over named pipes with +synchronous I/O. On windows, this corresponds to talking over a Named Pipe, +while on Unix it corresponds to UNIX domain sockets. + +These pipes are similar to TCP in the sense that you can have both a stream to a +server and a server itself. The server provided accepts other `UnixStream` +instances as clients. + +*/ + use prelude::*; -use super::super::*; -use super::super::support::PathLike; -pub struct UnixStream; +use c_str::ToCStr; +use rt::rtio::{IoFactory, RtioUnixListener, with_local_io}; +use rt::rtio::{RtioUnixAcceptor, RtioPipe}; +use rt::io::pipe::PipeStream; +use rt::io::{io_error, Listener, Acceptor, Reader, Writer}; + +/// A stream which communicates over a named pipe. +pub struct UnixStream { + priv obj: PipeStream, +} impl UnixStream { - pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> { - fail!() + fn new(obj: ~RtioPipe) -> UnixStream { + UnixStream { obj: PipeStream::new(obj) } + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the connection + /// could not be made. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixStream; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// + pub fn connect<P: ToCStr>(path: &P) -> Option<UnixStream> { + do with_local_io |io| { + match io.unix_connect(&path.to_c_str()) { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } } impl Reader for UnixStream { - fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() } - - fn eof(&mut self) -> bool { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) } + fn eof(&mut self) -> bool { self.obj.eof() } } impl Writer for UnixStream { - fn write(&mut self, _v: &[u8]) { fail!() } - - fn flush(&mut self) { fail!() } + fn write(&mut self, buf: &[u8]) { self.obj.write(buf) } + fn flush(&mut self) { self.obj.flush() } } -pub struct UnixListener; +pub struct UnixListener { + priv obj: ~RtioUnixListener, +} impl UnixListener { - pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> { - fail!() + + /// Creates a new listener, ready to receive incoming connections on the + /// specified socket. The server will be named by `path`. + /// + /// This listener will be closed when it falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the specified + /// path could not be bound. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixListener; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixListener::bind(&server); + /// for client in stream.incoming() { + /// let mut client = client; + /// client.write([1, 2, 3, 4]); + /// } + /// + pub fn bind<P: ToCStr>(path: &P) -> Option<UnixListener> { + do with_local_io |io| { + match io.unix_bind(&path.to_c_str()) { + Ok(s) => Some(UnixListener{ obj: s }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { - fn listen(self) -> Option<UnixAcceptor> { fail!() } + fn listen(self) -> Option<UnixAcceptor> { + match self.obj.listen() { + Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } -pub struct UnixAcceptor; +pub struct UnixAcceptor { + priv obj: ~RtioUnixAcceptor, +} impl Acceptor<UnixStream> for UnixAcceptor { - fn accept(&mut self) -> Option<UnixStream> { fail!() } + fn accept(&mut self) -> Option<UnixStream> { + match self.obj.accept() { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::*; + use cell::Cell; + use rt::test::*; + use rt::io::*; + use rt::comm::oneshot; + use os; + + fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) { + let server = Cell::new(server); + let client = Cell::new(client); + do run_in_mt_newsched_task { + let server = Cell::new(server.take()); + let client = Cell::new(client.take()); + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + server.take()(acceptor.accept().unwrap()); + } + + do spawntask { + port.take().recv(); + client.take()(UnixStream::connect(&path2).unwrap()); + } + } + } + + #[test] + fn bind_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == PermissionDenied); + called = true; + }).inside { + let listener = UnixListener::bind(&("path/to/nowhere")); + assert!(listener.is_none()); + } + assert!(called); + } + } + + #[test] + fn connect_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert_eq!(e.kind, OtherIoError); + called = true; + }).inside { + let stream = UnixStream::connect(&("path/to/nowhere")); + assert!(stream.is_none()); + } + assert!(called); + } + } + + #[test] + fn smoke() { + smalltest(|mut server| { + let mut buf = [0]; + server.read(buf); + assert!(buf[0] == 99); + }, |mut client| { + client.write([99]); + }) + } + + #[test] + fn read_eof() { + smalltest(|mut server| { + let mut buf = [0]; + assert!(server.read(buf).is_none()); + assert!(server.read(buf).is_none()); + }, |_client| { + // drop the client + }) + } + + #[test] + fn write_begone() { + smalltest(|mut server| { + let buf = [0]; + let mut stop = false; + while !stop{ + do io_error::cond.trap(|e| { + assert_eq!(e.kind, BrokenPipe); + stop = true; + }).inside { + server.write(buf); + } + } + }, |_client| { + // drop the client + }) + } + + #[test] + fn accept_lots() { + do run_in_mt_newsched_task { + let times = 10; + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + do times.times { + let mut client = acceptor.accept(); + let mut buf = [0]; + client.read(buf); + assert_eq!(buf[0], 100); + } + } + + do spawntask { + port.take().recv(); + do times.times { + let mut stream = UnixStream::connect(&path2); + stream.write([100]); + } + } + } + } + + #[test] + fn path_exists() { + do run_in_mt_newsched_task { + let path = next_test_unix(); + let _acceptor = UnixListener::bind(&path).listen(); + assert!(os::path_exists(&path)); + } + } } diff --git a/src/libstd/rt/io/option.rs b/src/libstd/rt/io/option.rs index 2ea1b615483..52699964b62 100644 --- a/src/libstd/rt/io/option.rs +++ b/src/libstd/rt/io/option.rs @@ -13,12 +13,10 @@ //! I/O constructors return option types to allow errors to be handled. //! These implementations allow e.g. `Option<FileStream>` to be used //! as a `Reader` without unwrapping the option first. -//! -//! # XXX Seek and Close use option::*; -use super::{Reader, Writer, Listener, Acceptor}; -use super::{standard_error, PreviousIoError, io_error, read_error, IoError}; +use super::{Reader, Writer, Listener, Acceptor, Seek, SeekStyle}; +use super::{standard_error, PreviousIoError, io_error, IoError}; fn prev_io_error() -> IoError { standard_error(PreviousIoError) @@ -45,7 +43,7 @@ impl<R: Reader> Reader for Option<R> { match *self { Some(ref mut reader) => reader.read(buf), None => { - read_error::cond.raise(prev_io_error()); + io_error::cond.raise(prev_io_error()); None } } @@ -62,6 +60,24 @@ impl<R: Reader> Reader for Option<R> { } } +impl<S: Seek> Seek for Option<S> { + fn tell(&self) -> u64 { + match *self { + Some(ref seeker) => seeker.tell(), + None => { + io_error::cond.raise(prev_io_error()); + 0 + } + } + } + fn seek(&mut self, pos: i64, style: SeekStyle) { + match *self { + Some(ref mut seeker) => seeker.seek(pos, style), + None => io_error::cond.raise(prev_io_error()) + } + } +} + impl<T, A: Acceptor<T>, L: Listener<T, A>> Listener<T, A> for Option<L> { fn listen(self) -> Option<A> { match self { @@ -91,7 +107,7 @@ mod test { use option::*; use super::super::mem::*; use rt::test::*; - use super::super::{PreviousIoError, io_error, read_error}; + use super::super::{PreviousIoError, io_error, io_error}; #[test] fn test_option_writer() { @@ -145,7 +161,7 @@ mod test { let mut buf = []; let mut called = false; - do read_error::cond.trap(|err| { + do io_error::cond.trap(|err| { assert_eq!(err.kind, PreviousIoError); called = true; }).inside { diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs index d2cd531ed26..ec9a4a0101f 100644 --- a/src/libstd/rt/io/pipe.rs +++ b/src/libstd/rt/io/pipe.rs @@ -15,37 +15,47 @@ use prelude::*; use super::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::local::Local; -use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory}; -use rt::rtio::RtioUnboundPipeObject; +use rt::io::{io_error, EndOfFile}; +use rt::io::native::file; +use rt::rtio::{RtioPipe, with_local_io}; pub struct PipeStream { - priv obj: RtioPipeObject + priv obj: ~RtioPipe, } -// This should not be a newtype, but rt::uv::process::set_stdio needs to reach -// into the internals of this :( -pub struct UnboundPipeStream(~RtioUnboundPipeObject); - impl PipeStream { - /// Creates a new pipe initialized, but not bound to any particular - /// source/destination - pub fn new() -> Option<UnboundPipeStream> { - let pipe = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).pipe_init(false) - }; - match pipe { - Ok(p) => Some(UnboundPipeStream(p)), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + /// Consumes a file descriptor to return a pipe stream that will have + /// synchronous, but non-blocking reads/writes. This is useful if the file + /// descriptor is acquired via means other than the standard methods. + /// + /// This operation consumes ownership of the file descriptor and it will be + /// closed once the object is deallocated. + /// + /// # Example + /// + /// use std::libc; + /// use std::rt::io::pipe; + /// + /// let mut pipe = PipeStream::open(libc::STDERR_FILENO); + /// pipe.write(bytes!("Hello, stderr!")); + /// + /// # Failure + /// + /// If the pipe cannot be created, an error will be raised on the + /// `io_error` condition. + pub fn open(fd: file::fd_t) -> Option<PipeStream> { + do with_local_io |io| { + match io.pipe_open(fd) { + Ok(obj) => Some(PipeStream { obj: obj }), + Err(e) => { + io_error::cond.raise(e); + None + } } } } - pub fn bind(inner: RtioPipeObject) -> PipeStream { + pub fn new(inner: ~RtioPipe) -> PipeStream { PipeStream { obj: inner } } } @@ -57,14 +67,14 @@ impl Reader for PipeStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } } } - fn eof(&mut self) -> bool { fail!() } + fn eof(&mut self) -> bool { false } } impl Writer for PipeStream { @@ -77,5 +87,5 @@ impl Writer for PipeStream { } } - fn flush(&mut self) { fail!() } + fn flush(&mut self) {} } diff --git a/src/libstd/rt/io/process.rs b/src/libstd/rt/io/process.rs index 5f2453852ee..ae087099d1f 100644 --- a/src/libstd/rt/io/process.rs +++ b/src/libstd/rt/io/process.rs @@ -11,12 +11,12 @@ //! Bindings for executing child processes use prelude::*; +use cell::Cell; use libc; use rt::io; use rt::io::io_error; -use rt::local::Local; -use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; +use rt::rtio::{RtioProcess, IoFactory, with_local_io}; // windows values don't matter as long as they're at least one of unix's // TERM/KILL/INT signals @@ -26,7 +26,7 @@ use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; #[cfg(not(windows))] pub static MustDieSignal: int = libc::SIGKILL as int; pub struct Process { - priv handle: ~RtioProcessObject, + priv handle: ~RtioProcess, io: ~[Option<io::PipeStream>], } @@ -57,7 +57,7 @@ pub struct ProcessConfig<'self> { /// 0 - stdin /// 1 - stdout /// 2 - stderr - io: ~[StdioContainer] + io: &'self [StdioContainer] } /// Describes what to do with a standard io stream for a child process. @@ -70,42 +70,32 @@ pub enum StdioContainer { /// specified for. InheritFd(libc::c_int), - // XXX: these two shouldn't have libuv-specific implementation details - - /// The specified libuv stream is inherited for the corresponding file - /// descriptor it is assigned to. - // XXX: this needs to be thought out more. - //InheritStream(uv::net::StreamWatcher), - - /// Creates a pipe for the specified file descriptor which will be directed - /// into the previously-initialized pipe passed in. + /// Creates a pipe for the specified file descriptor which will be created + /// when the process is spawned. /// /// The first boolean argument is whether the pipe is readable, and the /// second is whether it is writable. These properties are from the view of /// the *child* process, not the parent process. - CreatePipe(io::UnboundPipeStream, - bool /* readable */, - bool /* writable */), + CreatePipe(bool /* readable */, bool /* writable */), } impl Process { /// Creates a new pipe initialized, but not bound to any particular /// source/destination pub fn new(config: ProcessConfig) -> Option<Process> { - let process = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).spawn(config) - }; - match process { - Ok((p, io)) => Some(Process{ - handle: p, - io: io.move_iter().map(|p| - p.map(|p| io::PipeStream::bind(p)) - ).collect() - }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + let config = Cell::new(config); + do with_local_io |io| { + match io.spawn(config.take()) { + Ok((p, io)) => Some(Process{ + handle: p, + io: io.move_iter().map(|p| + p.map(|p| io::PipeStream::new(p)) + ).collect() + }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } diff --git a/src/libstd/rt/io/signal.rs b/src/libstd/rt/io/signal.rs new file mode 100644 index 00000000000..a13fc19d000 --- /dev/null +++ b/src/libstd/rt/io/signal.rs @@ -0,0 +1,220 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Signal handling + +This modules provides bindings to receive signals safely, built on top of the +local I/O factory. There are a number of defined signals which can be caught, +but not all signals will work across all platforms (windows doesn't have +definitions for a number of signals. + +*/ + +use comm::{Port, SharedChan, stream}; +use hashmap; +use option::{Some, None}; +use result::{Err, Ok}; +use rt::io::io_error; +use rt::rtio::{IoFactory, RtioSignal, with_local_io}; + +#[deriving(Eq, IterBytes)] +pub enum Signum { + /// Equivalent to SIGBREAK, delivered when the user presses Ctrl-Break. + Break = 21i, + /// Equivalent to SIGHUP, delivered when the user closes the terminal + /// window. On delivery of HangUp, the program is given approximately + /// 10 seconds to perfom any cleanup. After that, Windows will + /// unconditionally terminate it. + HangUp = 1i, + /// Equivalent to SIGINT, delivered when the user presses Ctrl-c. + Interrupt = 2i, + /// Equivalent to SIGQUIT, delivered when the user presses Ctrl-\. + Quit = 3i, + /// Equivalent to SIGTSTP, delivered when the user presses Ctrl-z. + StopTemporarily = 20i, + /// Equivalent to SIGUSR1. + User1 = 10i, + /// Equivalent to SIGUSR2. + User2 = 12i, + /// Equivalent to SIGWINCH, delivered when the console has been resized. + /// WindowSizeChange may not be delivered in a timely manner; size change + /// will only be detected when the cursor is being moved. + WindowSizeChange = 28i, +} + +/// Listener provides a port to listen for registered signals. +/// +/// Listener automatically unregisters its handles once it is out of scope. +/// However, clients can still unregister signums manually. +/// +/// # Example +/// +/// ```rust +/// use std::rt::io::signal::{Listener, Interrupt}; +/// +/// let mut listener = Listener::new(); +/// listener.register(signal::Interrupt); +/// +/// do spawn { +/// loop { +/// match listener.port.recv() { +/// Interrupt => println("Got Interrupt'ed"), +/// _ => (), +/// } +/// } +/// } +/// +/// ``` +pub struct Listener { + /// A map from signums to handles to keep the handles in memory + priv handles: hashmap::HashMap<Signum, ~RtioSignal>, + /// chan is where all the handles send signums, which are received by + /// the clients from port. + priv chan: SharedChan<Signum>, + + /// Clients of Listener can `recv()` from this port. This is exposed to + /// allow selection over this port as well as manipulation of the port + /// directly. + port: Port<Signum>, +} + +impl Listener { + /// Creates a new listener for signals. Once created, signals are bound via + /// the `register` method (otherwise nothing will ever be received) + pub fn new() -> Listener { + let (port, chan) = stream(); + Listener { + chan: SharedChan::new(chan), + port: port, + handles: hashmap::HashMap::new(), + } + } + + /// Listen for a signal, returning true when successfully registered for + /// signum. Signals can be received using `recv()`. + /// + /// Once a signal is registered, this listener will continue to receive + /// notifications of signals until it is unregistered. This occurs + /// regardless of the number of other listeners registered in other tasks + /// (or on this task). + /// + /// Signals are still received if there is no task actively waiting for + /// a signal, and a later call to `recv` will return the signal that was + /// received while no task was waiting on it. + /// + /// # Failure + /// + /// If this function fails to register a signal handler, then an error will + /// be raised on the `io_error` condition and the function will return + /// false. + pub fn register(&mut self, signum: Signum) -> bool { + if self.handles.contains_key(&signum) { + return true; // self is already listening to signum, so succeed + } + do with_local_io |io| { + match io.signal(signum, self.chan.clone()) { + Ok(w) => { + self.handles.insert(signum, w); + Some(()) + }, + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + }.is_some() + } + + /// Unregisters a signal. If this listener currently had a handler + /// registered for the signal, then it will stop receiving any more + /// notification about the signal. If the signal has already been received, + /// it may still be returned by `recv`. + pub fn unregister(&mut self, signum: Signum) { + self.handles.pop(&signum); + } +} + +#[cfg(test)] +mod test { + use libc; + use rt::io::timer; + use rt::io; + use super::*; + + // kill is only available on Unixes + #[cfg(unix)] + #[fixed_stack_segment] + fn sigint() { + unsafe { + libc::funcs::posix88::signal::kill(libc::getpid(), libc::SIGINT); + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_smoketest() { + let mut signal = Listener::new(); + signal.register(Interrupt); + sigint(); + timer::sleep(10); + match signal.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_two_signal_one_signum() { + let mut s1 = Listener::new(); + let mut s2 = Listener::new(); + s1.register(Interrupt); + s2.register(Interrupt); + sigint(); + timer::sleep(10); + match s1.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + match s1.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_unregister() { + let mut s1 = Listener::new(); + let mut s2 = Listener::new(); + s1.register(Interrupt); + s2.register(Interrupt); + s2.unregister(Interrupt); + sigint(); + timer::sleep(10); + if s2.port.peek() { + fail!("Unexpected {:?}", s2.port.recv()); + } + } + + #[cfg(windows)] + #[test] + fn test_io_signal_invalid_signum() { + let mut s = Listener::new(); + let mut called = false; + do io::io_error::cond.trap(|_| { + called = true; + }).inside { + if s.register(User1) { + fail!("Unexpected successful registry of signum {:?}", User1); + } + } + assert!(called); + } +} diff --git a/src/libstd/rt/io/stdio.rs b/src/libstd/rt/io/stdio.rs index e6dd9a48099..b922e6400cc 100644 --- a/src/libstd/rt/io/stdio.rs +++ b/src/libstd/rt/io/stdio.rs @@ -8,23 +8,90 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +/*! + +This modules provides bindings to the local event loop's TTY interface, using it +to have synchronous, but non-blocking versions of stdio. These handles can be +inspected for information about terminal dimensions or related information +about the stream or terminal that it is attached to. + +# Example + +```rust +use std::rt::io; + +let mut out = io::stdout(); +out.write(bytes!("Hello, world!")); +``` + +*/ + use fmt; use libc; use option::{Option, Some, None}; use result::{Ok, Err}; -use rt::local::Local; -use rt::rtio::{RtioFileStream, IoFactoryObject, IoFactory}; -use super::{Reader, Writer, io_error}; +use rt::rtio::{IoFactory, RtioTTY, RtioFileStream, with_local_io, + CloseAsynchronously}; +use super::{Reader, Writer, io_error, IoError, OtherIoError}; + +// And so begins the tale of acquiring a uv handle to a stdio stream on all +// platforms in all situations. Our story begins by splitting the world into two +// categories, windows and unix. Then one day the creators of unix said let +// there be redirection! And henceforth there was redirection away from the +// console for standard I/O streams. +// +// After this day, the world split into four factions: +// +// 1. Unix with stdout on a terminal. +// 2. Unix with stdout redirected. +// 3. Windows with stdout on a terminal. +// 4. Windows with stdout redirected. +// +// Many years passed, and then one day the nation of libuv decided to unify this +// world. After months of toiling, uv created three ideas: TTY, Pipe, File. +// These three ideas propagated throughout the lands and the four great factions +// decided to settle among them. +// +// The groups of 1, 2, and 3 all worked very hard towards the idea of TTY. Upon +// doing so, they even enhanced themselves further then their Pipe/File +// brethren, becoming the dominant powers. +// +// The group of 4, however, decided to work independently. They abandoned the +// common TTY belief throughout, and even abandoned the fledgling Pipe belief. +// The members of the 4th faction decided to only align themselves with File. +// +// tl;dr; TTY works on everything but when windows stdout is redirected, in that +// case pipe also doesn't work, but magically file does! +enum StdSource { + TTY(~RtioTTY), + File(~RtioFileStream), +} + +#[fixed_stack_segment] #[inline(never)] +fn src<T>(fd: libc::c_int, readable: bool, f: &fn(StdSource) -> T) -> T { + do with_local_io |io| { + let fd = unsafe { libc::dup(fd) }; + match io.tty_open(fd, readable) { + Ok(tty) => Some(f(TTY(tty))), + Err(_) => { + // It's not really that desirable if these handles are closed + // synchronously, and because they're squirreled away in a task + // structure the destructors will be run when the task is + // attempted to get destroyed. This means that if we run a + // synchronous destructor we'll attempt to do some scheduling + // operations which will just result in sadness. + Some(f(File(io.fs_from_raw_fd(fd, CloseAsynchronously)))) + } + } + }.unwrap() +} /// Creates a new non-blocking handle to the stdin of the current process. /// /// See `stdout()` for notes about this function. +#[fixed_stack_segment] #[inline(never)] pub fn stdin() -> StdReader { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDIN_FILENO, false) - }; - StdReader { inner: stream } + do src(libc::STDIN_FILENO, true) |src| { StdReader { inner: src } } } /// Creates a new non-blocking handle to the stdout of the current process. @@ -34,22 +101,14 @@ pub fn stdin() -> StdReader { /// task context because the stream returned will be a non-blocking object using /// the local scheduler to perform the I/O. pub fn stdout() -> StdWriter { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDOUT_FILENO, false) - }; - StdWriter { inner: stream } + do src(libc::STDOUT_FILENO, false) |src| { StdWriter { inner: src } } } /// Creates a new non-blocking handle to the stderr of the current process. /// /// See `stdout()` for notes about this function. pub fn stderr() -> StdWriter { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDERR_FILENO, false) - }; - StdWriter { inner: stream } + do src(libc::STDERR_FILENO, false) |src| { StdWriter { inner: src } } } /// Prints a string to the stdout of the current process. No newline is emitted @@ -87,12 +146,16 @@ pub fn println_args(fmt: &fmt::Arguments) { /// Representation of a reader of a standard input stream pub struct StdReader { - priv inner: ~RtioFileStream + priv inner: StdSource } impl Reader for StdReader { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { - match self.inner.read(buf) { + let ret = match self.inner { + TTY(ref mut tty) => tty.read(buf), + File(ref mut file) => file.read(buf).map_move(|i| i as uint), + }; + match ret { Ok(amt) => Some(amt as uint), Err(e) => { io_error::cond.raise(e); @@ -106,21 +169,102 @@ impl Reader for StdReader { /// Representation of a writer to a standard output stream pub struct StdWriter { - priv inner: ~RtioFileStream + priv inner: StdSource +} + +impl StdWriter { + /// Gets the size of this output window, if possible. This is typically used + /// when the writer is attached to something like a terminal, this is used + /// to fetch the dimensions of the terminal. + /// + /// If successful, returns Some((width, height)). + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if an error + /// happens. + pub fn winsize(&mut self) -> Option<(int, int)> { + match self.inner { + TTY(ref mut tty) => { + match tty.get_winsize() { + Ok(p) => Some(p), + Err(e) => { + io_error::cond.raise(e); + None + } + } + } + File(*) => { + io_error::cond.raise(IoError { + kind: OtherIoError, + desc: "stream is not a tty", + detail: None, + }); + None + } + } + } + + /// Controls whether this output stream is a "raw stream" or simply a normal + /// stream. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if an error + /// happens. + pub fn set_raw(&mut self, raw: bool) { + match self.inner { + TTY(ref mut tty) => { + match tty.set_raw(raw) { + Ok(()) => {}, + Err(e) => io_error::cond.raise(e), + } + } + File(*) => { + io_error::cond.raise(IoError { + kind: OtherIoError, + desc: "stream is not a tty", + detail: None, + }); + } + } + } + + /// Returns whether this tream is attached to a TTY instance or not. + /// + /// This is similar to libc's isatty() function + pub fn isatty(&self) -> bool { + match self.inner { + TTY(ref tty) => tty.isatty(), + File(*) => false, + } + } } impl Writer for StdWriter { fn write(&mut self, buf: &[u8]) { - match self.inner.write(buf) { + let ret = match self.inner { + TTY(ref mut tty) => tty.write(buf), + File(ref mut file) => file.write(buf), + }; + match ret { Ok(()) => {} Err(e) => io_error::cond.raise(e) } } - fn flush(&mut self) { - match self.inner.flush() { - Ok(()) => {} - Err(e) => io_error::cond.raise(e) - } + fn flush(&mut self) { /* nothing to do */ } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn smoke() { + // Just make sure we can acquire handles + stdin(); + stdout(); + stderr(); } } diff --git a/src/libstd/rt/io/support.rs b/src/libstd/rt/io/support.rs deleted file mode 100644 index 31040bc51a1..00000000000 --- a/src/libstd/rt/io/support.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use path::*; - -pub trait PathLike { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T; -} - -impl<'self> PathLike for &'self str { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T { - f(*self) - } -} - -impl PathLike for Path { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T { - let s = self.as_str().unwrap(); - f(s) - } -} - -#[cfg(test)] -mod test { - use path::*; - use super::PathLike; - - #[test] - fn path_like_smoke_test() { - let expected = if cfg!(unix) { "/home" } else { "C:\\" }; - let path = Path::new(expected); - path.path_as_str(|p| assert!(p == expected)); - path.path_as_str(|p| assert!(p == expected)); - } -} diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index b41d7541a60..fab0062ee00 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -10,13 +10,11 @@ use option::{Option, Some, None}; use result::{Ok, Err}; -use rt::io::{io_error}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioTimer, RtioTimerObject}; -use rt::local::Local; +use rt::io::io_error; +use rt::rtio::{IoFactory, RtioTimer, with_local_io}; pub struct Timer { - priv obj: ~RtioTimerObject + priv obj: ~RtioTimer } /// Sleep the current task for `msecs` milliseconds. @@ -28,20 +26,19 @@ pub fn sleep(msecs: u64) { impl Timer { + /// Creates a new timer which can be used to put the current task to sleep + /// for a number of milliseconds. pub fn new() -> Option<Timer> { - let timer = unsafe { - rtdebug!("Timer::init: borrowing io to init timer"); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - rtdebug!("about to init timer"); - (*io).timer_init() - }; - match timer { - Ok(t) => Some(Timer { obj: t }), - Err(ioerr) => { - rtdebug!("Timer::init: failed to init: {:?}", ioerr); - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.timer_init() { + Ok(t) => Some(Timer { obj: t }), + Err(ioerr) => { + rtdebug!("Timer::init: failed to init: {:?}", ioerr); + io_error::cond.raise(ioerr); + None + } } + } } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index d4f31879c00..1ddc2f86f4b 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -12,8 +12,6 @@ use option::{Option, Some, None}; use rt::sched::Scheduler; use rt::task::Task; use rt::local_ptr; -use rt::rtio::{EventLoop, IoFactoryObject}; -//use borrow::to_uint; use cell::Cell; pub trait Local { @@ -122,24 +120,6 @@ impl Local for Scheduler { } } -// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer -impl Local for IoFactoryObject { - fn put(_value: ~IoFactoryObject) { rtabort!("unimpl") } - fn take() -> ~IoFactoryObject { rtabort!("unimpl") } - fn exists(_: Option<IoFactoryObject>) -> bool { rtabort!("unimpl") } - fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { rtabort!("unimpl") } - unsafe fn unsafe_take() -> ~IoFactoryObject { rtabort!("unimpl") } - unsafe fn unsafe_borrow() -> *mut IoFactoryObject { - let sched: *mut Scheduler = Local::unsafe_borrow(); - let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); - return io; - } - unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { - rtabort!("unimpl") - } -} - - #[cfg(test)] mod test { use option::None; diff --git a/src/libstd/rt/logging.rs b/src/libstd/rt/logging.rs index 660d1cd4359..31650ede700 100644 --- a/src/libstd/rt/logging.rs +++ b/src/libstd/rt/logging.rs @@ -12,6 +12,7 @@ use fmt; use from_str::from_str; use libc::exit; use option::{Some, None, Option}; +use rt::io; use rt::crate_map::{ModEntry, CrateMap, iter_crate_map, get_crate_map}; use str::StrSlice; use u32; @@ -166,14 +167,23 @@ pub trait Logger { fn log(&mut self, args: &fmt::Arguments); } -pub struct StdErrLogger; +/// This logger emits output to the stderr of the process, and contains a lazily +/// initialized event-loop driven handle to the stream. +pub struct StdErrLogger { + priv handle: Option<io::stdio::StdWriter>, +} + +impl StdErrLogger { + pub fn new() -> StdErrLogger { StdErrLogger { handle: None } } +} impl Logger for StdErrLogger { fn log(&mut self, args: &fmt::Arguments) { - // FIXME(#6846): this should not call the blocking version of println, - // or at least the default loggers for tasks shouldn't do - // that - ::rt::util::dumb_println(args); + // First time logging? Get a handle to the stderr of this process. + if self.handle.is_none() { + self.handle = Some(io::stderr()); + } + fmt::writeln(self.handle.get_mut_ref() as &mut io::Writer, args); } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 9ea7b734d24..66d7a6bf488 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -279,7 +279,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~rtio::EventLoop; let mut sched = ~Scheduler::new(loop_, work_queue.clone(), work_queues.clone(), @@ -303,7 +303,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // set. let work_queue = WorkQueue::new(); - let main_loop = ~UvEventLoop::new(); + let main_loop = ~UvEventLoop::new() as ~rtio::EventLoop; let mut main_sched = ~Scheduler::new_special(main_loop, work_queue, work_queues.clone(), diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 501def8b060..66a0676a2f4 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -11,40 +11,29 @@ use libc; use option::*; use result::*; +use comm::SharedChan; use libc::c_int; +use c_str::CString; +use ai = rt::io::net::addrinfo; use rt::io::IoError; +use rt::io::signal::Signum; use super::io::process::ProcessConfig; use super::io::net::ip::{IpAddr, SocketAddr}; -use rt::uv::uvio; use path::Path; -use super::io::support::PathLike; use super::io::{SeekStyle}; use super::io::{FileMode, FileAccess, FileStat}; -// XXX: ~object doesn't work currently so these are some placeholder -// types to use instead -pub type EventLoopObject = uvio::UvEventLoop; -pub type RemoteCallbackObject = uvio::UvRemoteCallback; -pub type IoFactoryObject = uvio::UvIoFactory; -pub type RtioTcpStreamObject = uvio::UvTcpStream; -pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; -pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::UvUdpSocket; -pub type RtioTimerObject = uvio::UvTimer; -pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; -pub type RtioPipeObject = uvio::UvPipeStream; -pub type RtioUnboundPipeObject = uvio::UvUnboundPipe; -pub type RtioProcessObject = uvio::UvProcess; - pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; fn callback_ms(&mut self, ms: u64, ~fn()); - fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback; + /// The asynchronous I/O services. Not all event loops may provide one - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; + // FIXME(#9382) this is an awful interface + fn io<'a>(&'a mut self, f: &fn(&'a mut IoFactory)); } pub trait RemoteCallback { @@ -69,32 +58,74 @@ pub struct FileOpenConfig { priv mode: int } +/// Description of what to do when a file handle is closed +pub enum CloseBehavior { + /// Do not close this handle when the object is destroyed + DontClose, + /// Synchronously close the handle, meaning that the task will block when + /// the handle is destroyed until it has been fully closed. + CloseSynchronously, + /// Asynchronously closes a handle, meaning that the task will *not* block + /// when the handle is destroyed, but the handle will still get deallocated + /// and cleaned up (but this will happen asynchronously on the local event + /// loop). + CloseAsynchronously, +} + +pub fn with_local_io<T>(f: &fn(&mut IoFactory) -> Option<T>) -> Option<T> { + use rt::sched::Scheduler; + use rt::local::Local; + use rt::io::{io_error, standard_error, IoUnavailable}; + + unsafe { + let sched: *mut Scheduler = Local::unsafe_borrow(); + let mut io = None; + (*sched).event_loop.io(|i| io = Some(i)); + match io { + Some(io) => f(io), + None => { + io_error::cond.raise(standard_error(IoUnavailable)); + None + } + } + } +} + pub trait IoFactory { - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError>; - fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError>; - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError>; - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError>; - fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream; - fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess) + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError>; + fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError>; + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError>; + fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, + hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError>; + fn timer_init(&mut self) -> Result<~RtioTimer, IoError>; + fn fs_from_raw_fd(&mut self, fd: c_int, close: CloseBehavior) -> ~RtioFileStream; + fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; - fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError>; - fn fs_stat<P: PathLike>(&mut self, path: &P) -> Result<FileStat, IoError>; - fn fs_mkdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) -> + fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError>; + fn fs_mkdir(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_readdir(&mut self, path: &CString, flags: c_int) -> Result<~[Path], IoError>; - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>; fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>; + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>; + + fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>; + fn unix_bind(&mut self, path: &CString) -> + Result<~RtioUnixListener, IoError>; + fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>; + fn tty_open(&mut self, fd: c_int, readable: bool) + -> Result<~RtioTTY, IoError>; + fn signal(&mut self, signal: Signum, channel: SharedChan<Signum>) + -> Result<~RtioSignal, IoError>; } pub trait RtioTcpListener : RtioSocket { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; + fn listen(~self) -> Result<~RtioTcpAcceptor, IoError>; } pub trait RtioTcpAcceptor : RtioSocket { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; + fn accept(&mut self) -> Result<~RtioTcpStream, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } @@ -154,3 +185,30 @@ pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } + +pub trait RtioUnixListener { + fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; +} + +pub trait RtioUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipe, IoError>; + fn accept_simultaneously(&mut self) -> Result<(), IoError>; + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; +} + +pub trait RtioTTY { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn set_raw(&mut self, raw: bool) -> Result<(), IoError>; + fn get_winsize(&mut self) -> Result<(int, int), IoError>; + fn isatty(&self) -> bool; +} + +pub trait PausibleIdleCallback { + fn start(&mut self, f: ~fn()); + fn pause(&mut self); + fn resume(&mut self); + fn close(&mut self); +} + +pub trait RtioSignal {} diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ee163bab3c0..6e661884616 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -16,7 +16,7 @@ use unstable::raw; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; -use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; +use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; @@ -63,7 +63,7 @@ pub struct Scheduler { no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoopObject, + event_loop: ~EventLoop, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. priv sched_task: Option<~Task>, @@ -107,7 +107,7 @@ impl Scheduler { // * Initialization Functions - pub fn new(event_loop: ~EventLoopObject, + pub fn new(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) @@ -119,7 +119,7 @@ impl Scheduler { } - pub fn new_special(event_loop: ~EventLoopObject, + pub fn new_special(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, @@ -227,7 +227,7 @@ impl Scheduler { // mutable reference to the event_loop to give it the "run" // command. unsafe { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; + let event_loop: *mut ~EventLoop = &mut self_sched.event_loop; // Our scheduler must be in the task before the event loop // is started. @@ -793,7 +793,7 @@ pub enum SchedMessage { } pub struct SchedHandle { - priv remote: ~RemoteCallbackObject, + priv remote: ~RemoteCallback, priv queue: MessageQueue<SchedMessage>, sched_id: uint } @@ -905,6 +905,7 @@ mod test { use cell::Cell; use rt::thread::Thread; use rt::task::{Task, Sched}; + use rt::rtio::EventLoop; use rt::util; use option::{Some}; @@ -1020,7 +1021,7 @@ mod test { // Our normal scheduler let mut normal_sched = ~Scheduler::new( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, normal_queue, queues.clone(), sleepers.clone()); @@ -1031,7 +1032,7 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, special_queue.clone(), queues.clone(), sleepers.clone(), @@ -1202,7 +1203,7 @@ mod test { let queues = ~[queue.clone()]; let mut sched = ~Scheduler::new( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, queue, queues.clone(), sleepers.clone()); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c4f352501a0..1ea68bb52d7 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -132,7 +132,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, death: Death::new(), @@ -166,7 +166,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, death: Death::new(), @@ -188,7 +188,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, // FIXME(#7544) make watching optional @@ -479,7 +479,6 @@ pub extern "C" fn rust_stack_exhausted() { use rt::in_green_task_context; use rt::task::Task; use rt::local::Local; - use rt::logging::Logger; use unstable::intrinsics; unsafe { @@ -529,8 +528,12 @@ pub extern "C" fn rust_stack_exhausted() { do Local::borrow |task: &mut Task| { let n = task.name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); - format_args!(|args| { task.logger.log(args) }, - "task '{}' has overflowed its stack", n); + // See the message below for why this is not emitted to the + // task's logger. This has the additional conundrum of the + // logger may not be initialized just yet, meaning that an FFI + // call would happen to initialized it (calling out to libuv), + // and the FFI call needs 2MB of stack when we just ran out. + rterrln!("task '{}' has overflowed its stack", n); } } else { rterrln!("stack overflow in non-task context"); @@ -546,9 +549,9 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! { use rt::in_green_task_context; use rt::task::Task; use rt::local::Local; - use rt::logging::Logger; use str::Str; use c_str::CString; + use unstable::intrinsics; unsafe { let msg = CString::new(msg, false); @@ -557,35 +560,35 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! { Some(s) => s, None => rtabort!("message wasn't utf8?") }; - if in_green_task_context() { - // Be careful not to allocate in this block, if we're failing we may - // have been failing due to a lack of memory in the first place... - do Local::borrow |task: &mut Task| { - let n = task.name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); - - match file.as_str() { - Some(file) => { - format_args!(|args| { task.logger.log(args) }, - "task '{}' failed at '{}', {}:{}", - n, msg, file, line); - } - None => { - format_args!(|args| { task.logger.log(args) }, - "task '{}' failed at '{}'", n, msg); - } - } - } - } else { + if !in_green_task_context() { match file.as_str() { Some(file) => { rterrln!("failed in non-task context at '{}', {}:{}", msg, file, line as int); } - None => rterrln!("failed in non-task context at '{}'", msg), + None => rterrln!("failed in non-task context at '{}'", msg) } + intrinsics::abort(); } + // Be careful not to allocate in this block, if we're failing we may + // have been failing due to a lack of memory in the first place... let task: *mut Task = Local::unsafe_borrow(); + let n = (*task).name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); + + // XXX: this should no get forcibly printed to the console, this should + // either be sent to the parent task (ideally), or get printed to + // the task's logger. Right now the logger is actually a uvio + // instance, which uses unkillable blocks internally for various + // reasons. This will cause serious trouble if the task is failing + // due to mismanagment of its own kill flag, so calling our own + // logger in its current state is a bit of a problem. + match file.as_str() { + Some(file) => { + rterrln!("task '{}' failed at '{}', {}:{}", n, msg, file, line); + } + None => rterrln!("task '{}' failed at '{}'", n, msg), + } if (*task).unwinder.unwinding { rtabort!("unwinding again"); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 4f7ebb4a721..c238b1dfba1 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use rand; +use rand::Rng; +use os; use libc; use option::{Some, None}; +use path::Path; use cell::Cell; use clone::Clone; use container::Container; @@ -18,6 +22,7 @@ use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec::{OwnedVector, MutableVector, ImmutableVector}; use path::GenericPath; use rt::sched::Scheduler; +use rt::rtio::EventLoop; use unstable::{run_in_bare_thread}; use rt::thread::Thread; use rt::task::Task; @@ -32,7 +37,7 @@ pub fn new_test_uv_sched() -> Scheduler { let queue = WorkQueue::new(); let queues = ~[queue.clone()]; - let mut sched = Scheduler::new(~UvEventLoop::new(), + let mut sched = Scheduler::new(~UvEventLoop::new() as ~EventLoop, queue, queues, SleeperList::new()); @@ -191,7 +196,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } for i in range(0u, nthreads) { - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~EventLoop; let mut sched = ~Scheduler::new(loop_, work_queues[i].clone(), work_queues.clone(), @@ -327,6 +332,12 @@ pub fn next_test_port() -> u16 { } } +/// Get a temporary path which could be the location of a unix socket +#[fixed_stack_segment] #[inline(never)] +pub fn next_test_unix() -> Path { + os::tmpdir().join(rand::task_rng().gen_ascii_str(20)) +} + /// Get a unique IPv4 localhost:port pair starting at 9600 pub fn next_test_ip4() -> SocketAddr { SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() } diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index 647d88c26f2..070985fb0a5 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -71,9 +71,24 @@ pub fn default_sched_threads() -> uint { pub fn dumb_println(args: &fmt::Arguments) { use rt::io::native::stdio::stderr; - use rt::io::Writer; + use rt::io::{Writer, io_error, ResourceUnavailable}; + use rt::task::Task; + use rt::local::Local; + let mut out = stderr(); - fmt::writeln(&mut out as &mut Writer, args); + if Local::exists(None::<Task>) { + let mut again = true; + do io_error::cond.trap(|e| { + again = e.kind == ResourceUnavailable; + }).inside { + while again { + again = false; + fmt::writeln(&mut out as &mut Writer, args); + } + } + } else { + fmt::writeln(&mut out as &mut Writer, args); + } } pub fn abort(msg: &str) -> ! { diff --git a/src/libstd/rt/uv/addrinfo.rs b/src/libstd/rt/uv/addrinfo.rs index f2abcd3aca7..a1593d5c8db 100644 --- a/src/libstd/rt/uv/addrinfo.rs +++ b/src/libstd/rt/uv/addrinfo.rs @@ -18,9 +18,10 @@ use rt::uv::uvll; use rt::uv::uvll::UV_GETADDRINFO; use rt::uv::{Loop, UvError, NativeHandle}; use rt::uv::status_to_maybe_uv_error; -use rt::uv::net::UvAddrInfo; +use rt::uv::net; +use ai = rt::io::net::addrinfo; -type GetAddrInfoCallback = ~fn(GetAddrInfoRequest, &UvAddrInfo, Option<UvError>); +type GetAddrInfoCallback = ~fn(GetAddrInfoRequest, &net::UvAddrInfo, Option<UvError>); pub struct GetAddrInfoRequest(*uvll::uv_getaddrinfo_t); @@ -38,7 +39,7 @@ impl GetAddrInfoRequest { } pub fn getaddrinfo(&mut self, loop_: &Loop, node: Option<&str>, - service: Option<&str>, hints: Option<UvAddrInfo>, + service: Option<&str>, hints: Option<ai::Hint>, cb: GetAddrInfoCallback) { assert!(node.is_some() || service.is_some()); @@ -72,8 +73,41 @@ impl GetAddrInfoRequest { cb(req, addrinfo, err) }; - // XXX: Implement hints - assert!(hints.is_none()); + let hint = hints.map(|hint| { + let mut flags = 0; + do each_ai_flag |cval, aival| { + if hint.flags & (aival as uint) != 0 { + flags |= cval as i32; + } + } + /* XXX: do we really want to support these? + let socktype = match hint.socktype { + Some(ai::Stream) => uvll::rust_SOCK_STREAM(), + Some(ai::Datagram) => uvll::rust_SOCK_DGRAM(), + Some(ai::Raw) => uvll::rust_SOCK_RAW(), + None => 0, + }; + let protocol = match hint.protocol { + Some(ai::UDP) => uvll::rust_IPPROTO_UDP(), + Some(ai::TCP) => uvll::rust_IPPROTO_TCP(), + _ => 0, + }; + */ + let socktype = 0; + let protocol = 0; + + uvll::addrinfo { + ai_flags: flags, + ai_family: hint.family as c_int, + ai_socktype: socktype, + ai_protocol: protocol, + ai_addrlen: 0, + ai_canonname: null(), + ai_addr: null(), + ai_next: null(), + } + }); + let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo); self.get_req_data().getaddrinfo_cb = Some(wrapper_cb); @@ -83,7 +117,7 @@ impl GetAddrInfoRequest { getaddrinfo_cb, c_node_ptr, c_service_ptr, - null())); + hint_ptr)); } extern "C" fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t, @@ -91,7 +125,7 @@ impl GetAddrInfoRequest { res: *uvll::addrinfo) { let mut req: GetAddrInfoRequest = NativeHandle::from_native_handle(req); let err = status_to_maybe_uv_error(status); - let addrinfo = UvAddrInfo(res); + let addrinfo = net::UvAddrInfo(res); let data = req.get_req_data(); (*data.getaddrinfo_cb.get_ref())(req, &addrinfo, err); unsafe { @@ -137,6 +171,72 @@ impl GetAddrInfoRequest { } } +fn each_ai_flag(_f: &fn(c_int, ai::Flag)) { + /* XXX: do we really want to support these? + unsafe { + f(uvll::rust_AI_ADDRCONFIG(), ai::AddrConfig); + f(uvll::rust_AI_ALL(), ai::All); + f(uvll::rust_AI_CANONNAME(), ai::CanonName); + f(uvll::rust_AI_NUMERICHOST(), ai::NumericHost); + f(uvll::rust_AI_NUMERICSERV(), ai::NumericServ); + f(uvll::rust_AI_PASSIVE(), ai::Passive); + f(uvll::rust_AI_V4MAPPED(), ai::V4Mapped); + } + */ +} + +// Traverse the addrinfo linked list, producing a vector of Rust socket addresses +pub fn accum_addrinfo(addr: &net::UvAddrInfo) -> ~[ai::Info] { + unsafe { + let &net::UvAddrInfo(addr) = addr; + let mut addr = addr; + + let mut addrs = ~[]; + loop { + let uvaddr = net::sockaddr_to_UvSocketAddr((*addr).ai_addr); + let rustaddr = net::uv_socket_addr_to_socket_addr(uvaddr); + + let mut flags = 0; + do each_ai_flag |cval, aival| { + if (*addr).ai_flags & cval != 0 { + flags |= aival as uint; + } + } + + /* XXX: do we really want to support these + let protocol = match (*addr).ai_protocol { + p if p == uvll::rust_IPPROTO_UDP() => Some(ai::UDP), + p if p == uvll::rust_IPPROTO_TCP() => Some(ai::TCP), + _ => None, + }; + let socktype = match (*addr).ai_socktype { + p if p == uvll::rust_SOCK_STREAM() => Some(ai::Stream), + p if p == uvll::rust_SOCK_DGRAM() => Some(ai::Datagram), + p if p == uvll::rust_SOCK_RAW() => Some(ai::Raw), + _ => None, + }; + */ + let protocol = None; + let socktype = None; + + addrs.push(ai::Info { + address: rustaddr, + family: (*addr).ai_family as uint, + socktype: socktype, + protocol: protocol, + flags: flags, + }); + if (*addr).ai_next.is_not_null() { + addr = (*addr).ai_next; + } else { + break; + } + } + + return addrs; + } +} + impl NativeHandle<*uvll::uv_getaddrinfo_t> for GetAddrInfoRequest { fn from_native_handle(handle: *uvll::uv_getaddrinfo_t) -> GetAddrInfoRequest { GetAddrInfoRequest(handle) @@ -150,7 +250,6 @@ impl NativeHandle<*uvll::uv_getaddrinfo_t> for GetAddrInfoRequest { mod test { use option::{Some, None}; use rt::uv::Loop; - use rt::uv::net::accum_sockaddrs; use rt::io::net::ip::{SocketAddr, Ipv4Addr}; use super::*; @@ -159,14 +258,14 @@ mod test { let mut loop_ = Loop::new(); let mut req = GetAddrInfoRequest::new(); do req.getaddrinfo(&loop_, Some("localhost"), None, None) |_, addrinfo, _| { - let sockaddrs = accum_sockaddrs(addrinfo); + let sockaddrs = accum_addrinfo(addrinfo); let mut found_local = false; let local_addr = &SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 0 }; for addr in sockaddrs.iter() { - found_local = found_local || addr == local_addr; + found_local = found_local || addr.address == *local_addr; } assert!(found_local); } diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index ff7bb9dd03a..108aef43c3c 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -8,11 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::{c_int, c_void}; +use libc::c_int; use option::Some; use rt::uv::uvll; use rt::uv::uvll::UV_ASYNC; -use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback}; use rt::uv::WatcherInterop; use rt::uv::status_to_maybe_uv_error; @@ -47,27 +47,6 @@ impl AsyncWatcher { uvll::async_send(handle); } } - - pub fn close(self, cb: NullCallback) { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - - unsafe { - uvll::close(self.native_handle(), close_cb); - } - - extern fn close_cb(handle: *uvll::uv_stream_t) { - let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); - { - let data = watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - watcher.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *c_void); } - } - } } impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index 5d64ca4d755..d2ca15959b0 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -10,12 +10,13 @@ use prelude::*; use ptr::null; +use c_str; +use c_str::CString; use libc::c_void; use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, status_to_maybe_uv_error, UvError}; use rt::uv::uvll; use rt::uv::uvll::*; -use super::super::io::support::PathLike; use cast::transmute; use libc; use libc::{c_int}; @@ -36,74 +37,67 @@ impl FsRequest { fs_req } - pub fn open<P: PathLike>(self, loop_: &Loop, path: &P, flags: int, mode: int, - cb: FsCallback) { + pub fn open(self, loop_: &Loop, path: &CString, flags: int, mode: int, + cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_open(loop_.native_handle(), self.native_handle(), p, flags, mode, complete_cb_ptr) - }) }); + assert_eq!(ret, 0); } - pub fn open_sync<P: PathLike>(self, loop_: &Loop, path: &P, - flags: int, mode: int) -> Result<c_int, UvError> { + pub fn open_sync(self, loop_: &Loop, path: &CString, + flags: int, mode: int) -> Result<c_int, UvError> { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(None) }; - let result = path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let result = path.with_ref(|p| unsafe { uvll::fs_open(loop_.native_handle(), self.native_handle(), p, flags, mode, complete_cb_ptr) - }) }); self.sync_cleanup(result) } - pub fn unlink<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn unlink(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let ret = path.with_ref(|p| unsafe { + uvll::fs_unlink(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn unlink_sync<P: PathLike>(self, loop_: &Loop, path: &P) + pub fn unlink_sync(self, loop_: &Loop, path: &CString) -> Result<c_int, UvError> { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(None) }; - let result = path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let result = path.with_ref(|p| unsafe { + uvll::fs_unlink(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); self.sync_cleanup(result) } - pub fn stat<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn stat(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_stat(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let ret = path.with_ref(|p| unsafe { + uvll::fs_stat(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } pub fn write(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64, cb: FsCallback) { @@ -113,11 +107,12 @@ impl FsRequest { }; let base_ptr = buf.base as *c_void; let len = buf.len as uint; - unsafe { + let ret = unsafe { uvll::fs_write(loop_.native_handle(), self.native_handle(), fd, base_ptr, len, offset, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn write_sync(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) -> Result<c_int, UvError> { @@ -142,11 +137,12 @@ impl FsRequest { }; let buf_ptr = buf.base as *c_void; let len = buf.len as uint; - unsafe { + let ret = unsafe { uvll::fs_read(loop_.native_handle(), self.native_handle(), fd, buf_ptr, len, offset, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn read_sync(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) -> Result<c_int, UvError> { @@ -169,10 +165,11 @@ impl FsRequest { let mut me = self; me.req_boilerplate(Some(cb)) }; - unsafe { + let ret = unsafe { uvll::fs_close(loop_.native_handle(), self.native_handle(), fd, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn close_sync(self, loop_: &Loop, fd: c_int) -> Result<c_int, UvError> { let complete_cb_ptr = { @@ -186,44 +183,41 @@ impl FsRequest { self.sync_cleanup(result) } - pub fn mkdir<P: PathLike>(self, loop_: &Loop, path: &P, mode: int, cb: FsCallback) { + pub fn mkdir(self, loop_: &Loop, path: &CString, mode: int, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_mkdir(loop_.native_handle(), - self.native_handle(), p, mode, complete_cb_ptr) - }) + self.native_handle(), p, mode, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn rmdir<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn rmdir(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_rmdir(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn readdir<P: PathLike>(self, loop_: &Loop, path: &P, - flags: c_int, cb: FsCallback) { + pub fn readdir(self, loop_: &Loop, path: &CString, + flags: c_int, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_readdir(loop_.native_handle(), - self.native_handle(), p, flags, complete_cb_ptr) - }) + self.native_handle(), p, flags, complete_cb_ptr) }); + assert_eq!(ret, 0); } // accessors/utility funcs @@ -286,13 +280,10 @@ impl FsRequest { } } - pub fn get_paths(&mut self) -> ~[~str] { - use str; + pub fn each_path(&mut self, f: &fn(&CString)) { let ptr = self.get_ptr(); match self.get_result() { - n if (n <= 0) => { - ~[] - }, + n if (n <= 0) => {} n => { let n_len = n as uint; // we pass in the len that uv tells us is there @@ -301,11 +292,10 @@ impl FsRequest { // correctly delimited and we stray into garbage memory? // in any case, passing Some(n_len) fixes it and ensures // good results - let raw_path_strs = unsafe { - str::raw::from_c_multistring(ptr as *libc::c_char, Some(n_len)) }; - let raw_len = raw_path_strs.len(); - assert_eq!(raw_len, n_len); - raw_path_strs + unsafe { + c_str::from_c_multistring(ptr as *libc::c_char, + Some(n_len), f); + } } } } @@ -368,7 +358,6 @@ mod test { use vec; use str; use unstable::run_in_bare_thread; - use path::Path; use rt::uv::{Loop, Buf, slice_to_uv_buf}; use libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR}; @@ -391,10 +380,9 @@ mod test { let read_mem = vec::from_elem(read_buf_len, 0u8); let read_buf = slice_to_uv_buf(read_mem); let read_buf_ptr: *Buf = &read_buf; - let p = Path::new(path_str); let open_req = FsRequest::new(); - do open_req.open(&loop_, &p, create_flags as int, mode as int) - |req, uverr| { + do open_req.open(&loop_, &path_str.to_c_str(), create_flags as int, + mode as int) |req, uverr| { assert!(uverr.is_none()); let fd = req.get_result(); let buf = unsafe { *write_buf_ptr }; @@ -405,8 +393,8 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let open_req = FsRequest::new(); - do open_req.open(&loop_, &Path::new(path_str), read_flags as int,0) - |req, uverr| { + do open_req.open(&loop_, &path_str.to_c_str(), + read_flags as int,0) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let fd = req.get_result(); @@ -431,7 +419,8 @@ mod test { assert!(uverr.is_none()); let loop_ = &req.get_loop(); let unlink_req = FsRequest::new(); - do unlink_req.unlink(loop_, &Path::new(path_str)) + do unlink_req.unlink(loop_, + &path_str.to_c_str()) |_,uverr| { assert!(uverr.is_none()); }; @@ -465,8 +454,8 @@ mod test { let write_buf = slice_to_uv_buf(write_val); // open/create let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &Path::new(path_str), - create_flags as int, mode as int); + let result = open_req.open_sync(&loop_, &path_str.to_c_str(), + create_flags as int, mode as int); assert!(result.is_ok()); let fd = result.unwrap(); // write @@ -479,7 +468,7 @@ mod test { assert!(result.is_ok()); // re-open let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &Path::new(path_str), + let result = open_req.open_sync(&loop_, &path_str.to_c_str(), read_flags as int,0); assert!(result.is_ok()); let len = 1028; @@ -503,7 +492,7 @@ mod test { assert!(result.is_ok()); // unlink let unlink_req = FsRequest::new(); - let result = unlink_req.unlink_sync(&loop_, &Path::new(path_str)); + let result = unlink_req.unlink_sync(&loop_, &path_str.to_c_str()); assert!(result.is_ok()); } else { fail!("nread was 0.. wudn't expectin' that."); } loop_.close(); @@ -539,8 +528,8 @@ mod test { let write_buf = slice_to_uv_buf(write_val); let write_buf_ptr: *Buf = &write_buf; let open_req = FsRequest::new(); - do open_req.open(&loop_, &path, create_flags as int, mode as int) - |req, uverr| { + do open_req.open(&loop_, &path.to_c_str(), create_flags as int, + mode as int) |req, uverr| { assert!(uverr.is_none()); let fd = req.get_result(); let buf = unsafe { *write_buf_ptr }; @@ -549,7 +538,7 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat = req.get_stat(); @@ -560,11 +549,13 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let unlink_req = FsRequest::new(); - do unlink_req.unlink(&loop_, &path) |req,uverr| { + do unlink_req.unlink(&loop_, + &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |_, uverr| { + do stat_req.stat(&loop_, + &path.to_c_str()) |_, uverr| { // should cause an error because the // file doesn't exist anymore assert!(uverr.is_some()); @@ -587,22 +578,23 @@ mod test { let mode = S_IWUSR | S_IRUSR; let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), + mode as int) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat = req.get_stat(); naive_print(&loop_, format!("{:?}", stat)); assert!(stat.is_dir()); let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |req,uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |_req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |_req, uverr| { assert!(uverr.is_some()); } } @@ -620,16 +612,17 @@ mod test { let mode = S_IWUSR | S_IRUSR; let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), mode as int) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), + mode as int) |req,uverr| { assert!(uverr.is_some()); let loop_ = req.get_loop(); let _stat = req.get_stat(); let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |req,uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let _loop = req.get_loop(); } @@ -645,7 +638,7 @@ mod test { let mut loop_ = Loop::new(); let path = "./tmp/never_existed_dir"; let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |_req, uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |_req, uverr| { assert!(uverr.is_some()); } loop_.run(); diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index 8cbcd7b77c0..40f0750b2d0 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -11,7 +11,7 @@ use libc::c_int; use option::Some; use rt::uv::uvll; -use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback}; use rt::uv::status_to_maybe_uv_error; pub struct IdleWatcher(*uvll::uv_idle_t); @@ -20,9 +20,9 @@ impl Watcher for IdleWatcher { } impl IdleWatcher { pub fn new(loop_: &mut Loop) -> IdleWatcher { unsafe { - let handle = uvll::idle_new(); + let handle = uvll::malloc_handle(uvll::UV_IDLE); assert!(handle.is_not_null()); - assert!(0 == uvll::idle_init(loop_.native_handle(), handle)); + assert_eq!(uvll::idle_init(loop_.native_handle(), handle), 0); let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle); watcher.install_watcher_data(); return watcher @@ -36,29 +36,14 @@ impl IdleWatcher { } unsafe { - assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - let data = idle_watcher.get_watcher_data(); - let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(idle_watcher, status); + assert_eq!(uvll::idle_start(self.native_handle(), idle_cb), 0) } } pub fn restart(&mut self) { unsafe { - assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - let data = idle_watcher.get_watcher_data(); - let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(idle_watcher, status); + assert!(self.get_watcher_data().idle_cb.is_some()); + assert_eq!(uvll::idle_start(self.native_handle(), idle_cb), 0) } } @@ -68,30 +53,7 @@ impl IdleWatcher { // free unsafe { - assert!(0 == uvll::idle_stop(self.native_handle())); - } - } - - pub fn close(self, cb: NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb) }; - - extern fn close_cb(handle: *uvll::uv_idle_t) { - unsafe { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - { - let data = idle_watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - idle_watcher.drop_watcher_data(); - uvll::idle_delete(handle); - } + assert_eq!(uvll::idle_stop(self.native_handle()), 0); } } } @@ -105,6 +67,14 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { } } +extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + let data = idle_watcher.get_watcher_data(); + let cb: &IdleCallback = data.idle_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(idle_watcher, status); +} + #[cfg(test)] mod test { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 3a6a3acbc53..c92a54425bf 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -48,6 +48,7 @@ use cast::transmute; use ptr::null; use unstable::finally::Finally; use rt::io::net::ip::SocketAddr; +use rt::io::signal::Signum; use rt::io::IoError; @@ -60,6 +61,7 @@ pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; pub use self::process::Process; pub use self::pipe::Pipe; +pub use self::signal::SignalWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -75,6 +77,8 @@ pub mod async; pub mod addrinfo; pub mod process; pub mod pipe; +pub mod tty; +pub mod signal; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -83,6 +87,14 @@ pub struct Loop { priv handle: *uvll::uv_loop_t } +pub struct Handle(*uvll::uv_handle_t); + +impl Watcher for Handle {} +impl NativeHandle<*uvll::uv_handle_t> for Handle { + fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) } + fn native_handle(&self) -> *uvll::uv_handle_t { **self } +} + /// The trait implemented by uv 'watchers' (handles). Watchers are /// non-owning wrappers around the uv handles and are not completely /// safe - there may be multiple instances for a single underlying @@ -137,6 +149,7 @@ pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>); pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>); pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>); +pub type SignalCallback = ~fn(SignalWatcher, Signum); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle. @@ -153,6 +166,7 @@ struct WatcherData { udp_recv_cb: Option<UdpReceiveCallback>, udp_send_cb: Option<UdpSendCallback>, exit_cb: Option<ExitCallback>, + signal_cb: Option<SignalCallback>, } pub trait WatcherInterop { @@ -160,6 +174,8 @@ pub trait WatcherInterop { fn install_watcher_data(&mut self); fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData; fn drop_watcher_data(&mut self); + fn close(self, cb: NullCallback); + fn close_async(self); } impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { @@ -186,6 +202,7 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { udp_recv_cb: None, udp_send_cb: None, exit_cb: None, + signal_cb: None, }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -207,6 +224,34 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { uvll::set_data_for_uv_handle(self.native_handle(), null::<()>()); } } + + fn close(self, cb: NullCallback) { + let mut this = self; + { + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(this.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let mut h: Handle = NativeHandle::from_native_handle(handle); + h.get_watcher_data().close_cb.take_unwrap()(); + h.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void) } + } + } + + fn close_async(self) { + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let mut h: Handle = NativeHandle::from_native_handle(handle); + h.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void) } + } + } } // XXX: Need to define the error constants like EOF so they can be @@ -297,6 +342,13 @@ pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> /// The uv buffer type pub type Buf = uvll::uv_buf_t; +pub fn empty_buf() -> Buf { + uvll::uv_buf_t { + base: null(), + len: 0, + } +} + /// Borrow a slice to a Buf pub fn slice_to_uv_buf(v: &[u8]) -> Buf { let data = vec::raw::to_ptr(v); diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index a2608bf6b24..77de8348c14 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -13,8 +13,8 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use rt::uv::uvll; use rt::uv::uvll::*; use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback}; -use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, - status_to_maybe_uv_error, vec_to_uv_buf}; +use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, + status_to_maybe_uv_error, empty_buf}; use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec; use str; @@ -27,7 +27,7 @@ pub enum UvSocketAddr { UvIpv6SocketAddr(*sockaddr_in6), } -fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr { +pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr { unsafe { assert!((is_ip4_addr(addr) || is_ip6_addr(addr))); assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr))); @@ -96,28 +96,6 @@ pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr { uv_socket_addr_as_socket_addr(addr, util::id) } -// Traverse the addrinfo linked list, producing a vector of Rust socket addresses -pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] { - unsafe { - let &UvAddrInfo(addr) = addr; - let mut addr = addr; - - let mut addrs = ~[]; - loop { - let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr); - let rustaddr = uv_socket_addr_to_socket_addr(uvaddr); - addrs.push(rustaddr); - if (*addr).ai_next.is_not_null() { - addr = (*addr).ai_next; - } else { - break; - } - } - - return addrs; - } -} - #[cfg(test)] #[test] fn test_ip4_conversion() { @@ -141,23 +119,17 @@ impl Watcher for StreamWatcher { } impl StreamWatcher { pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { - { - let data = self.get_watcher_data(); - data.alloc_cb = Some(alloc); - data.read_cb = Some(cb); - } - - let ret = unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb) }; - - if ret != 0 { - // uvll::read_start failed, so read_cb will not be called. - // Call it manually for scheduling. - call_read_cb(self.native_handle(), ret as ssize_t); - } - - fn call_read_cb(stream: *uvll::uv_stream_t, errno: ssize_t) { - #[fixed_stack_segment]; #[inline(never)]; - read_cb(stream, errno, vec_to_uv_buf(~[])); + unsafe { + match uvll::read_start(self.native_handle(), alloc_cb, read_cb) { + 0 => { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + } + n => { + cb(*self, 0, empty_buf(), Some(UvError(n))) + } + } } extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { @@ -181,20 +153,25 @@ impl StreamWatcher { // but read_stop may be called from inside one of them and we // would end up freeing the in-use environment let handle = self.native_handle(); - unsafe { uvll::read_stop(handle); } + unsafe { assert_eq!(uvll::read_stop(handle), 0); } } pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.write_cb.is_none()); - data.write_cb = Some(cb); - } - let req = WriteRequest::new(); - unsafe { - assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb)); - } + return unsafe { + match uvll::write(req.native_handle(), self.native_handle(), + [buf], write_cb) { + 0 => { + let data = self.get_watcher_data(); + assert!(data.write_cb.is_none()); + data.write_cb = Some(cb); + } + n => { + req.delete(); + cb(*self, Some(UvError(n))) + } + } + }; extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { let write_request: WriteRequest = NativeHandle::from_native_handle(req); @@ -206,30 +183,36 @@ impl StreamWatcher { } } - pub fn accept(&mut self, stream: StreamWatcher) { - let self_handle = self.native_handle() as *c_void; - let stream_handle = stream.native_handle() as *c_void; - assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); - } - pub fn close(self, cb: NullCallback) { + pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> { { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); } - unsafe { uvll::close(self.native_handle(), close_cb); } + return unsafe { + static BACKLOG: c_int = 128; // XXX should be configurable + match uvll::listen(self.native_handle(), BACKLOG, connection_cb) { + 0 => Ok(()), + n => Err(UvError(n)) + } + }; - extern fn close_cb(handle: *uvll::uv_stream_t) { + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap(); - stream_watcher.drop_watcher_data(); - unsafe { free_handle(handle as *c_void) } - cb(); + let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(stream_watcher, status); } } + + pub fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); + } } impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { @@ -300,28 +283,6 @@ impl TcpWatcher { } } - pub fn listen(&mut self, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); - } - - unsafe { - static BACKLOG: c_int = 128; // XXX should be configurable - // XXX: This can probably fail - assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb)); - } - - extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { - rtdebug!("connection_cb"); - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(stream_watcher, status); - } - } - pub fn as_stream(&self) -> StreamWatcher { NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) } @@ -433,25 +394,6 @@ impl UdpWatcher { cb(udp_watcher, status); } } - - pub fn close(self, cb: NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_udp_t) { - let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap(); - udp_watcher.drop_watcher_data(); - unsafe { free_handle(handle as *c_void) } - cb(); - } - } } impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { @@ -464,12 +406,12 @@ impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { } // uv_connect_t is a subclass of uv_req_t -struct ConnectRequest(*uvll::uv_connect_t); +pub struct ConnectRequest(*uvll::uv_connect_t); impl Request for ConnectRequest { } impl ConnectRequest { - fn new() -> ConnectRequest { + pub fn new() -> ConnectRequest { let connect_handle = unsafe { malloc_req(UV_CONNECT) }; assert!(connect_handle.is_not_null()); ConnectRequest(connect_handle as *uvll::uv_connect_t) @@ -644,7 +586,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -678,7 +621,9 @@ mod test { } count_cell.put_back(count); } - } + }; + + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); @@ -705,7 +650,7 @@ mod test { loop_.run(); loop_.close(); client_thread.join(); - } + }; } #[test] @@ -718,7 +663,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -754,7 +700,8 @@ mod test { } count_cell.put_back(count); } - } + }; + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs index 1147c731a60..74b9312954c 100644 --- a/src/libstd/rt/uv/pipe.rs +++ b/src/libstd/rt/uv/pipe.rs @@ -10,6 +10,7 @@ use prelude::*; use libc; +use c_str::CString; use rt::uv; use rt::uv::net; @@ -37,23 +38,54 @@ impl Pipe { net::StreamWatcher(**self as *uvll::uv_stream_t) } - pub fn close(self, cb: uv::NullCallback) { + #[fixed_stack_segment] #[inline(never)] + pub fn open(&mut self, file: libc::c_int) -> Result<(), uv::UvError> { + match unsafe { uvll::pipe_open(self.native_handle(), file) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn bind(&mut self, name: &CString) -> Result<(), uv::UvError> { + do name.with_ref |name| { + match unsafe { uvll::pipe_bind(self.native_handle(), name) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn connect(&mut self, name: &CString, cb: uv::ConnectionCallback) { { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); } - unsafe { uvll::close(self.native_handle(), close_cb); } + let connect = net::ConnectRequest::new(); + let name = do name.with_ref |p| { p }; - extern fn close_cb(handle: *uvll::uv_pipe_t) { - let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } + unsafe { + uvll::pipe_connect(connect.native_handle(), + self.native_handle(), + name, + connect_cb) + } + + extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { + let connect_request: net::ConnectRequest = + uv::NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + + let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); + let status = uv::status_to_maybe_uv_error(status); + cb(stream_watcher, status); } } + } impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs index 176754de8f7..f0d0afeb6aa 100644 --- a/src/libstd/rt/uv/process.rs +++ b/src/libstd/rt/uv/process.rs @@ -12,12 +12,11 @@ use prelude::*; use cell::Cell; use libc; use ptr; -use util; use vec; use rt::io::process::*; use rt::uv; -use rt::uv::uvio::UvPipeStream; +use rt::uv::uvio::{UvPipeStream, UvUnboundPipe}; use rt::uv::uvll; /// A process wraps the handle of the underlying uv_process_t. @@ -42,9 +41,9 @@ impl Process { /// /// Returns either the corresponding process object or an error which /// occurred. - pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig, + pub fn spawn(&mut self, loop_: &uv::Loop, config: ProcessConfig, exit_cb: uv::ExitCallback) - -> Result<~[Option<UvPipeStream>], uv::UvError> + -> Result<~[Option<~UvPipeStream>], uv::UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); @@ -62,13 +61,14 @@ impl Process { err); } - let io = util::replace(&mut config.io, ~[]); + let io = config.io; let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(io.len()); let mut ret_io = vec::with_capacity(io.len()); unsafe { vec::raw::set_len(&mut stdio, io.len()); - for (slot, other) in stdio.iter().zip(io.move_iter()) { - let io = set_stdio(slot as *uvll::uv_stdio_container_t, other); + for (slot, other) in stdio.iter().zip(io.iter()) { + let io = set_stdio(slot as *uvll::uv_stdio_container_t, other, + loop_); ret_io.push(io); } } @@ -122,30 +122,12 @@ impl Process { pub fn pid(&self) -> libc::pid_t { unsafe { uvll::process_pid(**self) as libc::pid_t } } - - /// Closes this handle, invoking the specified callback once closed - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_process_t) { - let mut process: Process = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } } unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, - io: StdioContainer) -> Option<UvPipeStream> { - match io { + io: &StdioContainer, + loop_: &uv::Loop) -> Option<~UvPipeStream> { + match *io { Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); None @@ -155,7 +137,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, uvll::set_stdio_container_fd(dst, fd); None } - CreatePipe(pipe, readable, writable) => { + CreatePipe(readable, writable) => { let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; if readable { flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; @@ -163,10 +145,11 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, if writable { flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; } + let pipe = UvUnboundPipe::new(loop_); let handle = pipe.pipe.as_stream().native_handle(); uvll::set_stdio_container_flags(dst, flags); uvll::set_stdio_container_stream(dst, handle); - Some(pipe.bind()) + Some(~UvPipeStream::new(pipe)) } } } diff --git a/src/libstd/rt/uv/signal.rs b/src/libstd/rt/uv/signal.rs new file mode 100644 index 00000000000..3252c89673d --- /dev/null +++ b/src/libstd/rt/uv/signal.rs @@ -0,0 +1,73 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use cast; +use option::Some; +use libc::c_int; +use result::{Err, Ok, Result}; +use rt::io::signal::Signum; +use rt::uv::{Loop, NativeHandle, SignalCallback, UvError, Watcher}; +use rt::uv::uvll; + +pub struct SignalWatcher(*uvll::uv_signal_t); + +impl Watcher for SignalWatcher { } + +impl SignalWatcher { + pub fn new(loop_: &mut Loop) -> SignalWatcher { + unsafe { + let handle = uvll::malloc_handle(uvll::UV_SIGNAL); + assert!(handle.is_not_null()); + assert!(0 == uvll::signal_init(loop_.native_handle(), handle)); + let mut watcher: SignalWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher; + } + } + + pub fn start(&mut self, signum: Signum, callback: SignalCallback) + -> Result<(), UvError> + { + return unsafe { + match uvll::signal_start(self.native_handle(), signal_cb, + signum as c_int) { + 0 => { + let data = self.get_watcher_data(); + data.signal_cb = Some(callback); + Ok(()) + } + n => Err(UvError(n)), + } + }; + + extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { + let mut watcher: SignalWatcher = NativeHandle::from_native_handle(handle); + let data = watcher.get_watcher_data(); + let cb = data.signal_cb.get_ref(); + (*cb)(watcher, unsafe { cast::transmute(signum as int) }); + } + } + + pub fn stop(&mut self) { + unsafe { + uvll::signal_stop(self.native_handle()); + } + } +} + +impl NativeHandle<*uvll::uv_signal_t> for SignalWatcher { + fn from_native_handle(handle: *uvll::uv_signal_t) -> SignalWatcher { + SignalWatcher(handle) + } + + fn native_handle(&self) -> *uvll::uv_signal_t { + match self { &SignalWatcher(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs index 7b09cf2eb0e..fb3c84df39f 100644 --- a/src/libstd/rt/uv/timer.rs +++ b/src/libstd/rt/uv/timer.rs @@ -8,10 +8,10 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::{c_void, c_int}; +use libc::c_int; use option::Some; use rt::uv::uvll; -use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback}; use rt::uv::status_to_maybe_uv_error; pub struct TimerWatcher(*uvll::uv_timer_t); @@ -53,31 +53,6 @@ impl TimerWatcher { uvll::timer_stop(self.native_handle()); } } - - pub fn close(self, cb: NullCallback) { - let mut watcher = self; - { - let data = watcher.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { - uvll::close(watcher.native_handle(), close_cb); - } - - extern fn close_cb(handle: *uvll::uv_timer_t) { - let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle); - { - let data = watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - watcher.drop_watcher_data(); - unsafe { - uvll::free_handle(handle as *c_void); - } - } - } } impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher { diff --git a/src/libstd/rt/uv/tty.rs b/src/libstd/rt/uv/tty.rs new file mode 100644 index 00000000000..f44c5ae8eff --- /dev/null +++ b/src/libstd/rt/uv/tty.rs @@ -0,0 +1,84 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc; + +use rt::uv; +use rt::uv::net; +use rt::uv::uvll; + +/// A process wraps the handle of the underlying uv_process_t. +pub struct TTY(*uvll::uv_tty_t); + +impl uv::Watcher for TTY {} + +impl TTY { + #[fixed_stack_segment] #[inline(never)] + pub fn new(loop_: &uv::Loop, fd: libc::c_int, readable: bool) -> + Result<TTY, uv::UvError> + { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) }; + assert!(handle.is_not_null()); + + let ret = unsafe { + uvll::tty_init(loop_.native_handle(), handle, fd as libc::c_int, + readable as libc::c_int) + }; + match ret { + 0 => { + let mut ret: TTY = uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + Ok(ret) + } + n => { + unsafe { uvll::free_handle(handle); } + Err(uv::UvError(n)) + } + } + } + + pub fn as_stream(&self) -> net::StreamWatcher { + net::StreamWatcher(**self as *uvll::uv_stream_t) + } + + #[fixed_stack_segment] #[inline(never)] + pub fn set_mode(&self, raw: bool) -> Result<(), uv::UvError> { + let raw = raw as libc::c_int; + match unsafe { uvll::tty_set_mode(self.native_handle(), raw) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + + #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)] + pub fn get_winsize(&self) -> Result<(int, int), uv::UvError> { + let mut width: libc::c_int = 0; + let mut height: libc::c_int = 0; + let widthptr: *libc::c_int = &width; + let heightptr: *libc::c_int = &width; + + match unsafe { uvll::tty_get_winsize(self.native_handle(), + widthptr, heightptr) } { + 0 => Ok((width as int, height as int)), + n => Err(uv::UvError(n)) + } + } +} + +impl uv::NativeHandle<*uvll::uv_tty_t> for TTY { + fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY { + TTY(handle) + } + fn native_handle(&self) -> *uvll::uv_tty_t { + match self { &TTY(ptr) => ptr } + } +} + diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 8dd0f8a6b10..29370c484eb 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -8,17 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use c_str::ToCStr; +use c_str::{ToCStr, CString}; use cast::transmute; use cast; use cell::Cell; use clone::Clone; +use comm::{SendDeferred, SharedChan}; use libc::{c_int, c_uint, c_void, pid_t}; use ops::Drop; use option::*; use ptr; use str; -use str::Str; use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; @@ -32,17 +32,18 @@ use rt::tube::Tube; use rt::task::SchedHome; use rt::uv::*; use rt::uv::idle::IdleWatcher; -use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs}; -use rt::uv::addrinfo::GetAddrInfoRequest; +use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr}; +use rt::uv::addrinfo::{GetAddrInfoRequest, accum_addrinfo}; use unstable::sync::Exclusive; use path::{GenericPath, Path}; -use super::super::io::support::PathLike; use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, S_IWUSR, S_IRWXU}; use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create, CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite, FileStat}; +use rt::io::signal::Signum; use task; +use ai = rt::io::net::addrinfo; #[cfg(test)] use container::Container; #[cfg(test)] use unstable::run_in_bare_thread; @@ -214,11 +215,11 @@ impl EventLoop for UvEventLoop { fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { let idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); - return ~UvPausibleIdleCallback { + ~UvPausibleIdleCallback { watcher: idle_watcher, idle_flag: false, closed: false - }; + } as ~PausibleIdleCallback } fn callback_ms(&mut self, ms: u64, f: ~fn()) { @@ -230,12 +231,12 @@ impl EventLoop for UvEventLoop { } } - fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { - ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback } - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { - Some(&mut self.uvio) + fn io<'a>(&'a mut self, f: &fn(&'a mut IoFactory)) { + f(&mut self.uvio as &mut IoFactory) } } @@ -245,30 +246,30 @@ pub struct UvPausibleIdleCallback { priv closed: bool } -impl UvPausibleIdleCallback { +impl PausibleIdleCallback for UvPausibleIdleCallback { #[inline] - pub fn start(&mut self, f: ~fn()) { + fn start(&mut self, f: ~fn()) { do self.watcher.start |_idle_watcher, _status| { f(); }; self.idle_flag = true; } #[inline] - pub fn pause(&mut self) { + fn pause(&mut self) { if self.idle_flag == true { self.watcher.stop(); self.idle_flag = false; } } #[inline] - pub fn resume(&mut self) { + fn resume(&mut self) { if self.idle_flag == false { self.watcher.restart(); self.idle_flag = true; } } #[inline] - pub fn close(&mut self) { + fn close(&mut self) { self.pause(); if !self.closed { self.closed = true; @@ -414,9 +415,9 @@ impl UvIoFactory { /// Helper for a variety of simple uv_fs_* functions that /// have no ret val -fn uv_fs_helper<P: PathLike>(loop_: &mut Loop, path: &P, - cb: ~fn(&mut FsRequest, &mut Loop, &P, - ~fn(&FsRequest, Option<UvError>))) +fn uv_fs_helper(loop_: &mut Loop, path: &CString, + cb: ~fn(&mut FsRequest, &mut Loop, &CString, + ~fn(&FsRequest, Option<UvError>))) -> Result<(), IoError> { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell; @@ -446,11 +447,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> { + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell; + let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell; // Block this task and take ownership, switch to scheduler context do task::unkillable { // FIXME(#8674) @@ -466,7 +467,8 @@ impl IoFactory for UvIoFactory { None => { let tcp = NativeHandle::from_native_handle(stream.native_handle()); let home = get_handle_to_current_scheduler!(); - let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + let res = Ok(~UvTcpStream { watcher: tcp, home: home } + as ~RtioTcpStream); // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } @@ -493,12 +495,12 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> { + fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpListener::new(watcher, home)) + Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener) } Err(uverr) => { do task::unkillable { // FIXME(#8674) @@ -516,12 +518,12 @@ impl IoFactory for UvIoFactory { } } - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { let home = get_handle_to_current_scheduler!(); - Ok(~UvUdpSocket { watcher: watcher, home: home }) + Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket) } Err(uverr) => { do task::unkillable { // FIXME(#8674) @@ -539,19 +541,19 @@ impl IoFactory for UvIoFactory { } } - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { + fn timer_init(&mut self) -> Result<~RtioTimer, IoError> { let watcher = TimerWatcher::new(self.uv_loop()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTimer::new(watcher, home)) + Ok(~UvTimer::new(watcher, home) as ~RtioTimer) } - fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream { + fn fs_from_raw_fd(&mut self, fd: c_int, close: CloseBehavior) -> ~RtioFileStream { let loop_ = Loop {handle: self.uv_loop().native_handle()}; let home = get_handle_to_current_scheduler!(); - ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream + ~UvFileStream::new(loop_, fd, close, home) as ~RtioFileStream } - fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess) + fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError> { let mut flags = match fm { Open => 0, @@ -588,7 +590,7 @@ impl IoFactory for UvIoFactory { let home = get_handle_to_current_scheduler!(); let fd = req.get_result() as c_int; let fs = ~UvFileStream::new( - loop_, fd, true, home) as ~RtioFileStream; + loop_, fd, CloseSynchronously, home) as ~RtioFileStream; let res = Ok(fs); unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); @@ -606,14 +608,14 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> { do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| { do unlink_req.unlink(l, p) |req, err| { cb(req, err) }; } } - fn fs_stat<P: PathLike>(&mut self, path: &P) -> Result<FileStat, IoError> { + fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> { use str::StrSlice; let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<FileStat, @@ -625,14 +627,15 @@ impl IoFactory for UvIoFactory { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let path = path_cell.take(); - let path_str = path.path_as_str(|p| p.to_owned()); - do stat_req.stat(self.uv_loop(), path) - |req,err| { + // Don't pick up the null byte + let slice = path.as_bytes().slice(0, path.len()); + let path_instance = Cell::new(Path::new(slice)); + do stat_req.stat(self.uv_loop(), path) |req,err| { let res = match err { None => { let stat = req.get_stat(); Ok(FileStat { - path: Path::new(path_str.as_slice()), + path: path_instance.take(), is_file: stat.is_file(), is_dir: stat.is_dir(), device: stat.st_dev, @@ -658,12 +661,16 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError> { + fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, + hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> { let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell<Result<~[IpAddr], IoError>> = &result_cell; - let host_ptr: *&str = &host; + let result_cell_ptr: *Cell<Result<~[ai::Info], IoError>> = &result_cell; + let host_ptr: *Option<&str> = &host; + let servname_ptr: *Option<&str> = &servname; + let hint_ptr: *Option<ai::Hint> = &hint; let addrinfo_req = GetAddrInfoRequest::new(); let addrinfo_req_cell = Cell::new(addrinfo_req); + do task::unkillable { // FIXME(#8674) let scheduler: ~Scheduler = Local::take(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -671,10 +678,10 @@ impl IoFactory for UvIoFactory { let mut addrinfo_req = addrinfo_req_cell.take(); unsafe { do addrinfo_req.getaddrinfo(self.uv_loop(), - Some(*host_ptr), - None, None) |_, addrinfo, err| { + *host_ptr, *servname_ptr, + *hint_ptr) |_, addrinfo, err| { let res = match err { - None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())), + None => Ok(accum_addrinfo(addrinfo)), Some(err) => Err(uv_error_to_io_error(err)) }; (*result_cell_ptr).put_back(res); @@ -688,7 +695,7 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } - fn fs_mkdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_mkdir(&mut self, path: &CString) -> Result<(), IoError> { let mode = S_IRWXU as int; do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| { do mkdir_req.mkdir(l, p, mode as int) |req, err| { @@ -696,14 +703,14 @@ impl IoFactory for UvIoFactory { }; } } - fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> { do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| { do rmdir_req.rmdir(l, p) |req, err| { cb(req, err) }; } } - fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) -> + fn fs_readdir(&mut self, path: &CString, flags: c_int) -> Result<~[Path], IoError> { use str::StrSlice; let result_cell = Cell::new_empty(); @@ -716,17 +723,17 @@ impl IoFactory for UvIoFactory { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let path = path_cell.take(); - let path_str = path.path_as_str(|p| p.to_owned()); - do stat_req.readdir(self.uv_loop(), path, flags) - |req,err| { + // Don't pick up the null byte + let slice = path.as_bytes().slice(0, path.len()); + let path_parent = Cell::new(Path::new(slice)); + do stat_req.readdir(self.uv_loop(), path, flags) |req,err| { + let parent = path_parent.take(); let res = match err { None => { - let rel_paths = req.get_paths(); let mut paths = ~[]; - for r in rel_paths.iter() { - let mut p = Path::new(path_str.as_slice()); - p.push(r.as_slice()); - paths.push(p); + do req.each_path |rel_path| { + let p = rel_path.as_bytes(); + paths.push(parent.join(p.slice_to(rel_path.len()))); } Ok(paths) }, @@ -744,13 +751,8 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> { - let home = get_handle_to_current_scheduler!(); - Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) - } - fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError> + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError> { // Sadly, we must create the UvProcess before we actually call uv_spawn // so that the exit_cb can close over it and notify it when the process @@ -792,7 +794,8 @@ impl IoFactory for UvIoFactory { Ok(io) => { // Only now do we actually get a handle to this scheduler. ret.home = Some(get_handle_to_current_scheduler!()); - Ok((ret, io)) + Ok((ret as ~RtioProcess, + io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect())) } Err(uverr) => { // We still need to close the process handle we created, but @@ -801,6 +804,76 @@ impl IoFactory for UvIoFactory { } } } + + fn unix_bind(&mut self, path: &CString) -> + Result<~RtioUnixListener, IoError> { + let mut pipe = UvUnboundPipe::new(self.uv_loop()); + match pipe.pipe.bind(path) { + Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener), + Err(e) => Err(uv_error_to_io_error(e)), + } + } + + fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> { + let pipe = UvUnboundPipe::new(self.uv_loop()); + let mut rawpipe = pipe.pipe; + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell; + let pipe_cell = Cell::new(pipe); + let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell; + + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do rawpipe.connect(path) |_stream, err| { + let res = match err { + None => { + let pipe = unsafe { (*pipe_cell_ptr).take() }; + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) + } + Some(e) => Err(uv_error_to_io_error(e)), + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn tty_open(&mut self, fd: c_int, readable: bool) + -> Result<~RtioTTY, IoError> { + match tty::TTY::new(self.uv_loop(), fd, readable) { + Ok(tty) => Ok(~UvTTY { + home: get_handle_to_current_scheduler!(), + tty: tty, + fd: fd, + } as ~RtioTTY), + Err(e) => Err(uv_error_to_io_error(e)) + } + } + + fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> { + let mut pipe = UvUnboundPipe::new(self.uv_loop()); + match pipe.pipe.open(fd) { + Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe), + Err(e) => Err(uv_error_to_io_error(e)) + } + } + + fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>) + -> Result<~RtioSignal, IoError> { + let watcher = SignalWatcher::new(self.uv_loop()); + let home = get_handle_to_current_scheduler!(); + let mut signal = ~UvSignal::new(watcher, home); + match signal.watcher.start(signum, |_, _| channel.send_deferred(signum)) { + Ok(()) => Ok(signal as ~RtioSignal), + Err(e) => Err(uv_error_to_io_error(e)), + } + } } pub struct UvTcpListener { @@ -841,11 +914,12 @@ impl RtioSocket for UvTcpListener { } impl RtioTcpListener for UvTcpListener { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + fn listen(~self) -> Result<~RtioTcpAcceptor, IoError> { do self.home_for_io_consume |self_| { - let mut acceptor = ~UvTcpAcceptor::new(self_); + let acceptor = ~UvTcpAcceptor::new(self_); let incoming = Cell::new(acceptor.incoming.clone()); - do acceptor.listener.watcher.listen |mut server, status| { + let mut stream = acceptor.listener.watcher.as_stream(); + let res = do stream.listen |mut server, status| { do incoming.with_mut_ref |incoming| { let inc = match status { Some(_) => Err(standard_error(OtherIoError)), @@ -854,20 +928,24 @@ impl RtioTcpListener for UvTcpListener { // first accept call in the callback guarenteed to succeed server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: inc, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home } + as ~RtioTcpStream) } }; incoming.send(inc); } }; - Ok(acceptor) + match res { + Ok(()) => Ok(acceptor as ~RtioTcpAcceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } } } } pub struct UvTcpAcceptor { priv listener: UvTcpListener, - priv incoming: Tube<Result<~RtioTcpStreamObject, IoError>>, + priv incoming: Tube<Result<~RtioTcpStream, IoError>>, } impl HomingIO for UvTcpAcceptor { @@ -888,8 +966,19 @@ impl RtioSocket for UvTcpAcceptor { } } +fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> { + let r = unsafe { + uvll::tcp_simultaneous_accepts(stream.native_handle(), a as c_int) + }; + + match status_to_maybe_uv_error(r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } +} + impl RtioTcpAcceptor for UvTcpAcceptor { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + fn accept(&mut self) -> Result<~RtioTcpStream, IoError> { do self.home_for_io |self_| { self_.incoming.recv() } @@ -897,27 +986,13 @@ impl RtioTcpAcceptor for UvTcpAcceptor { fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 1) } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 0) } } } @@ -994,6 +1069,17 @@ pub struct UvUnboundPipe { priv home: SchedHandle, } +impl UvUnboundPipe { + /// Creates a new unbound pipe homed to the current scheduler, placed on the + /// specified event loop + pub fn new(loop_: &Loop) -> UvUnboundPipe { + UvUnboundPipe { + pipe: Pipe::new(loop_, false), + home: get_handle_to_current_scheduler!(), + } + } +} + impl HomingIO for UvUnboundPipe { fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } @@ -1013,18 +1099,12 @@ impl Drop for UvUnboundPipe { } } -impl UvUnboundPipe { - pub unsafe fn bind(~self) -> UvPipeStream { - UvPipeStream { inner: self } - } -} - pub struct UvPipeStream { - priv inner: ~UvUnboundPipe, + priv inner: UvUnboundPipe, } impl UvPipeStream { - pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream { + pub fn new(inner: UvUnboundPipe) -> UvPipeStream { UvPipeStream { inner: inner } } } @@ -1402,8 +1482,8 @@ impl RtioTimer for UvTimer { pub struct UvFileStream { priv loop_: Loop, priv fd: c_int, - priv close_on_drop: bool, - priv home: SchedHandle + priv close: CloseBehavior, + priv home: SchedHandle, } impl HomingIO for UvFileStream { @@ -1411,13 +1491,13 @@ impl HomingIO for UvFileStream { } impl UvFileStream { - fn new(loop_: Loop, fd: c_int, close_on_drop: bool, + fn new(loop_: Loop, fd: c_int, close: CloseBehavior, home: SchedHandle) -> UvFileStream { UvFileStream { loop_: loop_, fd: fd, - close_on_drop: close_on_drop, - home: home + close: close, + home: home, } } fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> { @@ -1437,9 +1517,9 @@ impl UvFileStream { unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; - }; + } + } + } result_cell.take() } fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { @@ -1459,9 +1539,9 @@ impl UvFileStream { unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; - }; + } + } + } result_cell.take() } fn seek_common(&mut self, pos: i64, whence: c_int) -> @@ -1484,16 +1564,23 @@ impl UvFileStream { impl Drop for UvFileStream { fn drop(&mut self) { - if self.close_on_drop { - do self.home_for_io_with_sched |self_, scheduler| { - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let close_req = file::FsRequest::new(); - do close_req.close(&self_.loop_, self_.fd) |_,_| { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; + match self.close { + DontClose => {} + CloseAsynchronously => { + let close_req = file::FsRequest::new(); + do close_req.close(&self.loop_, self.fd) |_,_| {} + } + CloseSynchronously => { + do self.home_for_io_with_sched |self_, scheduler| { + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let close_req = file::FsRequest::new(); + do close_req.close(&self_.loop_, self_.fd) |_,_| { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } } } } @@ -1612,13 +1699,185 @@ impl RtioProcess for UvProcess { } } +pub struct UvUnixListener { + priv inner: UvUnboundPipe +} + +impl HomingIO for UvUnixListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() } +} + +impl UvUnixListener { + fn new(pipe: UvUnboundPipe) -> UvUnixListener { + UvUnixListener { inner: pipe } + } +} + +impl RtioUnixListener for UvUnixListener { + fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> { + do self.home_for_io_consume |self_| { + let acceptor = ~UvUnixAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + let mut stream = acceptor.listener.inner.pipe.as_stream(); + let res = do stream.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { + Some(e) => Err(uv_error_to_io_error(e)), + None => { + let pipe = UvUnboundPipe::new(&server.event_loop()); + server.accept(pipe.pipe.as_stream()); + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) + } + }; + incoming.send(inc); + } + }; + match res { + Ok(()) => Ok(acceptor as ~RtioUnixAcceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } + } + } +} + +pub struct UvTTY { + tty: tty::TTY, + home: SchedHandle, + fd: c_int, +} + +impl HomingIO for UvTTY { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvTTY { + fn drop(&mut self) { + // TTY handles are used for the logger in a task, so this destructor is + // run when a task is destroyed. When a task is being destroyed, a local + // scheduler isn't available, so we can't do the normal "take the + // scheduler and resume once close is done". Instead close operations on + // a TTY are asynchronous. + self.tty.close_async(); + } +} + +impl RtioTTY for UvTTY { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { + do self.home_for_io_with_sched |self_, scheduler| { + read_stream(self_.tty.as_stream(), scheduler, buf) + } + } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + do self.home_for_io_with_sched |self_, scheduler| { + write_stream(self_.tty.as_stream(), scheduler, buf) + } + } + + fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { + do self.home_for_io |self_| { + match self_.tty.set_mode(raw) { + Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) + } + } + } + + fn get_winsize(&mut self) -> Result<(int, int), IoError> { + do self.home_for_io |self_| { + match self_.tty.get_winsize() { + Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) + } + } + } + + fn isatty(&self) -> bool { + unsafe { uvll::guess_handle(self.fd) == uvll::UV_TTY } + } +} + +pub struct UvUnixAcceptor { + listener: UvUnixListener, + incoming: Tube<Result<~RtioPipe, IoError>>, +} + +impl HomingIO for UvUnixAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvUnixAcceptor { + fn new(listener: UvUnixListener) -> UvUnixAcceptor { + UvUnixAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioUnixAcceptor for UvUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipe, IoError> { + do self.home_for_io |self_| { + self_.incoming.recv() + } + } + + fn accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1) + } + } + + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0) + } + } +} + +pub struct UvSignal { + watcher: signal::SignalWatcher, + home: SchedHandle, +} + +impl HomingIO for UvSignal { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl UvSignal { + fn new(w: signal::SignalWatcher, home: SchedHandle) -> UvSignal { + UvSignal { watcher: w, home: home } + } +} + +impl RtioSignal for UvSignal {} + +impl Drop for UvSignal { + fn drop(&mut self) { + do self.home_for_io_with_sched |self_, scheduler| { + rtdebug!("closing UvSignal"); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +// this function is full of lies +unsafe fn local_io() -> &'static mut IoFactory { + do Local::borrow |sched: &mut Scheduler| { + let mut io = None; + sched.event_loop.io(|i| io = Some(i)); + cast::transmute(io.unwrap()) + } +} + #[test] fn test_simple_io_no_connect() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let addr = next_test_ip4(); - let maybe_chan = (*io).tcp_connect(addr); + let maybe_chan = io.tcp_connect(addr); assert!(maybe_chan.is_err()); } } @@ -1628,9 +1887,9 @@ fn test_simple_io_no_connect() { fn test_simple_udp_io_bind_only() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let addr = next_test_ip4(); - let maybe_socket = (*io).udp_bind(addr); + let maybe_socket = io.udp_bind(addr); assert!(maybe_socket.is_ok()); } } @@ -1649,9 +1908,11 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { let work_queue2 = WorkQueue::new(); let queues = ~[work_queue1.clone(), work_queue2.clone()]; - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + let loop1 = ~UvEventLoop::new() as ~EventLoop; + let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + let loop2 = ~UvEventLoop::new() as ~EventLoop; + let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), sleepers.clone()); let handle1 = Cell::new(sched1.make_handle()); @@ -1665,11 +1926,9 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { }; let test_function: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; + let io = unsafe { local_io() }; let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io).udp_bind(addr) }; + let maybe_socket = io.udp_bind(addr); // this socket is bound to this event loop assert!(maybe_socket.is_ok()); @@ -1728,9 +1987,11 @@ fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { let work_queue2 = WorkQueue::new(); let queues = ~[work_queue1.clone(), work_queue2.clone()]; - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + let loop1 = ~UvEventLoop::new() as ~EventLoop; + let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + let loop2 = ~UvEventLoop::new() as ~EventLoop; + let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), sleepers.clone()); let handle1 = Cell::new(sched1.make_handle()); @@ -1741,11 +2002,9 @@ fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { let chan = Cell::new(chan); let body1: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; + let io = unsafe { local_io() }; let addr = next_test_ip4(); - let socket = unsafe { (*io).udp_bind(addr) }; + let socket = io.udp_bind(addr); assert!(socket.is_ok()); chan.take().send(socket); }; @@ -1799,8 +2058,8 @@ fn test_simple_tcp_server_and_client() { // Start the server first so it's listening when we connect do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let listener = (*io).tcp_bind(addr).unwrap(); + let io = local_io(); + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -1817,8 +2076,8 @@ fn test_simple_tcp_server_and_client() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); } } @@ -1842,9 +2101,11 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { let client_work_queue = WorkQueue::new(); let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; - let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue, + let sloop = ~UvEventLoop::new() as ~EventLoop; + let mut server_sched = ~Scheduler::new(sloop, server_work_queue, queues.clone(), sleepers.clone()); - let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue, + let cloop = ~UvEventLoop::new() as ~EventLoop; + let mut client_sched = ~Scheduler::new(cloop, client_work_queue, queues.clone(), sleepers.clone()); let server_handle = Cell::new(server_sched.make_handle()); @@ -1861,8 +2122,8 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let server_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let io = unsafe { local_io() }; + let listener = io.tcp_bind(server_addr).unwrap(); let mut acceptor = listener.listen().unwrap(); let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; @@ -1874,12 +2135,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let client_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; - let mut stream = unsafe { (*io).tcp_connect(client_addr) }; + let io = unsafe { local_io() }; + let mut stream = io.tcp_connect(client_addr); while stream.is_err() { - stream = unsafe { (*io).tcp_connect(client_addr) }; + stream = io.tcp_connect(client_addr); } stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); }; @@ -1918,8 +2177,8 @@ fn test_simple_udp_server_and_client() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server_socket = (*io).udp_bind(server_addr).unwrap(); + let io = local_io(); + let mut server_socket = io.udp_bind(server_addr).unwrap(); chan.take().send(()); let mut buf = [0, .. 2048]; let (nread,src) = server_socket.recvfrom(buf).unwrap(); @@ -1934,8 +2193,8 @@ fn test_simple_udp_server_and_client() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client_socket = (*io).udp_bind(client_addr).unwrap(); + let io = local_io(); + let mut client_socket = io.udp_bind(client_addr).unwrap(); port.take().recv(); client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr); } @@ -1952,8 +2211,8 @@ fn test_read_and_block() { let chan = Cell::new(chan); do spawntask { - let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let io = unsafe { local_io() }; + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -1991,8 +2250,8 @@ fn test_read_and_block() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -2014,8 +2273,8 @@ fn test_read_read_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let listener = (*io).tcp_bind(addr).unwrap(); + let io = local_io(); + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -2031,8 +2290,8 @@ fn test_read_read_read() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX { @@ -2060,8 +2319,8 @@ fn test_udp_twice() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client = (*io).udp_bind(client_addr).unwrap(); + let io = local_io(); + let mut client = io.udp_bind(client_addr).unwrap(); port.take().recv(); assert!(client.sendto([1], server_addr).is_ok()); assert!(client.sendto([2], server_addr).is_ok()); @@ -2070,8 +2329,8 @@ fn test_udp_twice() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server = (*io).udp_bind(server_addr).unwrap(); + let io = local_io(); + let mut server = io.udp_bind(server_addr).unwrap(); chan.take().send(()); let mut buf1 = [0]; let mut buf2 = [0]; @@ -2105,9 +2364,9 @@ fn test_udp_many_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server_out = (*io).udp_bind(server_out_addr).unwrap(); - let mut server_in = (*io).udp_bind(server_in_addr).unwrap(); + let io = local_io(); + let mut server_out = io.udp_bind(server_out_addr).unwrap(); + let mut server_in = io.udp_bind(server_in_addr).unwrap(); let (port, chan) = first.take(); chan.send(()); port.recv(); @@ -2131,9 +2390,9 @@ fn test_udp_many_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client_out = (*io).udp_bind(client_out_addr).unwrap(); - let mut client_in = (*io).udp_bind(client_in_addr).unwrap(); + let io = local_io(); + let mut client_out = io.udp_bind(client_out_addr).unwrap(); + let mut client_in = io.udp_bind(client_in_addr).unwrap(); let (port, chan) = second.take(); port.recv(); chan.send(()); @@ -2163,8 +2422,8 @@ fn test_udp_many_read() { fn test_timer_sleep_simple() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let timer = (*io).timer_init(); + let io = local_io(); + let timer = io.timer_init(); do timer.map_move |mut t| { t.sleep(1) }; } } @@ -2174,29 +2433,28 @@ fn file_test_uvio_full_simple_impl() { use str::StrSlice; // why does this have to be explicitly imported to work? // compiler was complaining about no trait for str that // does .as_bytes() .. - use path::Path; use rt::io::{Open, Create, ReadWrite, Read}; unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let write_val = "hello uvio!"; let path = "./tmp/file_test_uvio_full.txt"; { let create_fm = Create; let create_fa = ReadWrite; - let mut fd = (*io).fs_open(&Path::new(path), create_fm, create_fa).unwrap(); + let mut fd = io.fs_open(&path.to_c_str(), create_fm, create_fa).unwrap(); let write_buf = write_val.as_bytes(); fd.write(write_buf); } { let ro_fm = Open; let ro_fa = Read; - let mut fd = (*io).fs_open(&Path::new(path), ro_fm, ro_fa).unwrap(); + let mut fd = io.fs_open(&path.to_c_str(), ro_fm, ro_fa).unwrap(); let mut read_vec = [0, .. 1028]; let nread = fd.read(read_vec).unwrap(); let read_val = str::from_utf8(read_vec.slice(0, nread as uint)); assert!(read_val == write_val.to_owned()); } - (*io).fs_unlink(&Path::new(path)); + io.fs_unlink(&path.to_c_str()); } } @@ -2211,9 +2469,9 @@ fn uvio_naive_print(input: &str) { use str::StrSlice; unsafe { use libc::{STDOUT_FILENO}; - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); { - let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false); + let mut fd = io.fs_from_raw_fd(STDOUT_FILENO, DontClose); let write_buf = input.as_bytes(); fd.write(write_buf); } diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 367585b0f0e..75e6a0c6ca5 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -131,6 +131,8 @@ pub type uv_udp_send_t = c_void; pub type uv_getaddrinfo_t = c_void; pub type uv_process_t = c_void; pub type uv_pipe_t = c_void; +pub type uv_tty_t = c_void; +pub type uv_signal_t = c_void; pub struct uv_timespec_t { tv_sec: libc::c_long, @@ -218,6 +220,8 @@ pub type uv_getaddrinfo_cb = extern "C" fn(req: *uv_getaddrinfo_t, pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, exit_status: c_int, term_signal: c_int); +pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t, + signum: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; @@ -231,37 +235,37 @@ pub type socklen_t = c_int; #[cfg(target_os = "android")] #[cfg(target_os = "linux")] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: socklen_t, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: socklen_t, ai_addr: *sockaddr, - priv ai_canonname: *char, + ai_canonname: *char, ai_next: *addrinfo } #[cfg(target_os = "macos")] #[cfg(target_os = "freebsd")] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: socklen_t, - priv ai_canonname: *char, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: socklen_t, + ai_canonname: *char, ai_addr: *sockaddr, ai_next: *addrinfo } #[cfg(windows)] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: size_t, - priv ai_canonname: *char, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: size_t, + ai_canonname: *char, ai_addr: *sockaddr, ai_next: *addrinfo } @@ -419,18 +423,6 @@ pub unsafe fn walk(loop_handle: *c_void, cb: uv_walk_cb, arg: *c_void) { rust_uv_walk(loop_handle, cb, arg); } -pub unsafe fn idle_new() -> *uv_idle_t { - #[fixed_stack_segment]; #[inline(never)]; - - rust_uv_idle_new() -} - -pub unsafe fn idle_delete(handle: *uv_idle_t) { - #[fixed_stack_segment]; #[inline(never)]; - - rust_uv_idle_delete(handle) -} - pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -958,6 +950,52 @@ pub unsafe fn freeaddrinfo(ai: *addrinfo) { #[fixed_stack_segment]; #[inline(never)]; rust_uv_freeaddrinfo(ai); } +pub unsafe fn pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_open(pipe, file) +} +pub unsafe fn pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_bind(pipe, name) +} +pub unsafe fn pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t, + name: *c_char, cb: uv_connect_cb) { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_connect(req, handle, name, cb) +} +pub unsafe fn tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int, + readable: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_init(loop_ptr, tty, fd, readable) +} +pub unsafe fn tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_set_mode(tty, mode) +} +pub unsafe fn tty_get_winsize(tty: *uv_tty_t, width: *c_int, + height: *c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_get_winsize(tty, width, height) +} +pub unsafe fn guess_handle(fd: c_int) -> uv_handle_type { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_guess_handle(fd) +} + +pub unsafe fn signal_init(loop_: *uv_loop_t, handle: *uv_signal_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_init(loop_, handle); +} +pub unsafe fn signal_start(handle: *uv_signal_t, + signal_cb: uv_signal_cb, + signum: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_start(handle, signal_cb, signum); +} +pub unsafe fn signal_stop(handle: *uv_signal_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_stop(handle); +} pub struct uv_err_data { priv err_name: ~str, @@ -978,8 +1016,6 @@ extern { fn rust_uv_close(handle: *c_void, cb: uv_close_cb); fn rust_uv_walk(loop_handle: *c_void, cb: uv_walk_cb, arg: *c_void); - fn rust_uv_idle_new() -> *uv_idle_t; - fn rust_uv_idle_delete(handle: *uv_idle_t); fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int; fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int; fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int; @@ -1102,4 +1138,36 @@ extern { fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, stream: *uv_stream_t); fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; + + fn rust_uv_pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int; + fn rust_uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int; + fn rust_uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t, + name: *c_char, cb: uv_connect_cb); + fn rust_uv_tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int, + readable: c_int) -> c_int; + fn rust_uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int; + fn rust_uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int, + height: *c_int) -> c_int; + fn rust_uv_guess_handle(fd: c_int) -> uv_handle_type; + + // XXX: see comments in addrinfo.rs + // These should all really be constants... + //#[rust_stack] pub fn rust_SOCK_STREAM() -> c_int; + //#[rust_stack] pub fn rust_SOCK_DGRAM() -> c_int; + //#[rust_stack] pub fn rust_SOCK_RAW() -> c_int; + //#[rust_stack] pub fn rust_IPPROTO_UDP() -> c_int; + //#[rust_stack] pub fn rust_IPPROTO_TCP() -> c_int; + //#[rust_stack] pub fn rust_AI_ADDRCONFIG() -> c_int; + //#[rust_stack] pub fn rust_AI_ALL() -> c_int; + //#[rust_stack] pub fn rust_AI_CANONNAME() -> c_int; + //#[rust_stack] pub fn rust_AI_NUMERICHOST() -> c_int; + //#[rust_stack] pub fn rust_AI_NUMERICSERV() -> c_int; + //#[rust_stack] pub fn rust_AI_PASSIVE() -> c_int; + //#[rust_stack] pub fn rust_AI_V4MAPPED() -> c_int; + + fn rust_uv_signal_init(loop_: *uv_loop_t, handle: *uv_signal_t) -> c_int; + fn rust_uv_signal_start(handle: *uv_signal_t, + signal_cb: uv_signal_cb, + signum: c_int) -> c_int; + fn rust_uv_signal_stop(handle: *uv_signal_t) -> c_int; } |
