diff options
| author | bors <bors@rust-lang.org> | 2013-10-24 14:26:15 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-10-24 14:26:15 -0700 |
| commit | 3f5b2219cc893b30863f9136703166f306fcc684 (patch) | |
| tree | d7267619b1909f2deaf319c560a64d667d141d35 /src/libstd | |
| parent | 61f8c059c4c6082683d78b2ee3d963f65fa1eb98 (diff) | |
| parent | 188e471339dfe652b8ff9f3bbe4cc262a040c584 (diff) | |
| download | rust-3f5b2219cc893b30863f9136703166f306fcc684.tar.gz rust-3f5b2219cc893b30863f9136703166f306fcc684.zip | |
auto merge of #9901 : alexcrichton/rust/unix-sockets, r=brson
Large topics: * Implemented `rt::io::net::unix`. We've got an implementation backed by "named pipes" for windows for free from libuv, so I'm not sure if these should be `cfg(unix)` or whether they'd be better placed in `rt::io::pipe` (which is currently kinda useless), or to leave in `unix`. Regardless, we probably shouldn't deny windows of functionality which it certainly has. * Fully implemented `net::addrinfo`, or at least fully implemented in the sense of making the best attempt to wrap libuv's `getaddrinfo` api * Moved standard I/O to a libuv TTY instead of just a plain old file descriptor. I found that this interacted better when closing stdin, and it has the added bonus of getting things like terminal dimentions (someone should make a progress bar now!) * Migrate to `~Trait` instead of a typedef'd object where possible. There are only two more types which are blocked on this, and those are traits which have a method which takes by-value self (there's an open issue on this) * Drop `rt::io::support::PathLike` in favor of just `ToCStr`. We recently had a lot of Path work done, but it still wasn't getting passed down to libuv (there was an intermediate string conversion), and this allows true paths to work all the way down to libuv (and anything else that can become a C string). * Removes `extra::fileinput` and `extra::io_util` Closes #9895 Closes #9975 Closes #8330 Closes #6850 (ported lots of libraries away from std::io) cc #4248 (implemented unix/dns) cc #9128 (made everything truly trait objects)
Diffstat (limited to 'src/libstd')
48 files changed, 2534 insertions, 3398 deletions
diff --git a/src/libstd/c_str.rs b/src/libstd/c_str.rs index acfa02a4def..b2e68c8d20f 100644 --- a/src/libstd/c_str.rs +++ b/src/libstd/c_str.rs @@ -348,12 +348,57 @@ impl<'self> Iterator<libc::c_char> for CStringIterator<'self> { } } +/// Parses a C "multistring", eg windows env values or +/// the req->ptr result in a uv_fs_readdir() call. +/// +/// Optionally, a `count` can be passed in, limiting the +/// parsing to only being done `count`-times. +/// +/// The specified closure is invoked with each string that +/// is found, and the number of strings found is returned. +pub unsafe fn from_c_multistring(buf: *libc::c_char, + count: Option<uint>, + f: &fn(&CString)) -> uint { + + let mut curr_ptr: uint = buf as uint; + let mut ctr = 0; + let (limited_count, limit) = match count { + Some(limit) => (true, limit), + None => (false, 0) + }; + while ((limited_count && ctr < limit) || !limited_count) + && *(curr_ptr as *libc::c_char) != 0 as libc::c_char { + let cstr = CString::new(curr_ptr as *libc::c_char, false); + f(&cstr); + curr_ptr += cstr.len() + 1; + ctr += 1; + } + return ctr; +} + #[cfg(test)] mod tests { use super::*; use libc; use ptr; use option::{Some, None}; + use vec; + + #[test] + fn test_str_multistring_parsing() { + unsafe { + let input = bytes!("zero", "\x00", "one", "\x00", "\x00"); + let ptr = vec::raw::to_ptr(input); + let expected = ["zero", "one"]; + let mut it = expected.iter(); + let result = do from_c_multistring(ptr as *libc::c_char, None) |c| { + let cbytes = c.as_bytes().slice_to(c.len()); + assert_eq!(cbytes, it.next().unwrap().as_bytes()); + }; + assert_eq!(result, 2); + assert!(it.next().is_none()); + } + } #[test] fn test_str_to_c_str() { diff --git a/src/libstd/io.rs b/src/libstd/io.rs deleted file mode 100644 index 4e55c5fe60e..00000000000 --- a/src/libstd/io.rs +++ /dev/null @@ -1,2181 +0,0 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! - -The `io` module contains basic input and output routines. - -A quick summary: - -## `Reader` and `Writer` traits - -These traits define the minimal set of methods that anything that can do -input and output should implement. - -## `ReaderUtil` and `WriterUtil` traits - -Richer methods that allow you to do more. `Reader` only lets you read a certain -number of bytes into a buffer, while `ReaderUtil` allows you to read a whole -line, for example. - -Generally, these richer methods are probably the ones you want to actually -use in day-to-day Rust. - -Furthermore, because there is an implementation of `ReaderUtil` for -`<T: Reader>`, when your input or output code implements `Reader`, you get -all of these methods for free. - -## `print` and `println` - -These very useful functions are defined here. You generally don't need to -import them, though, as the prelude already does. - -## `stdin`, `stdout`, and `stderr` - -These functions return references to the classic three file descriptors. They -implement `Reader` and `Writer`, where appropriate. - -*/ - -#[allow(missing_doc)]; - -use cast; -use cast::transmute; -use clone::Clone; -use c_str::ToCStr; -use container::Container; -use int; -use iter::Iterator; -use libc::consts::os::posix88::*; -use libc::{c_int, c_void, size_t}; -use libc; -use num; -use ops::Drop; -use option::{Some, None}; -use os; -use path::{Path,GenericPath}; -use ptr; -use result::{Result, Ok, Err}; -use str::{StrSlice, OwnedStr}; -use str; -use to_str::ToStr; -use uint; -use vec::{MutableVector, ImmutableVector, OwnedVector, OwnedCopyableVector, CopyableVector}; -use vec; - -#[allow(non_camel_case_types)] // not sure what to do about this -pub type fd_t = c_int; - -pub mod rustrt { - use libc; - - #[link_name = "rustrt"] - extern { - pub fn rust_get_stdin() -> *libc::FILE; - pub fn rust_get_stdout() -> *libc::FILE; - pub fn rust_get_stderr() -> *libc::FILE; - } -} - -// Reading - -// FIXME (#2004): This is all buffered. We might need an unbuffered variant -// as well -/** -* The SeekStyle enum describes the relationship between the position -* we'd like to seek to from our current position. It's used as an argument -* to the `seek` method defined on the `Reader` trait. -* -* There are three seek styles: -* -* 1. `SeekSet` means that the new position should become our position. -* 2. `SeekCur` means that we should seek from the current position. -* 3. `SeekEnd` means that we should seek from the end. -* -* # Examples -* -* None right now. -*/ -pub enum SeekStyle { SeekSet, SeekEnd, SeekCur, } - - -/** -* The core Reader trait. All readers must implement this trait. -* -* # Examples -* -* None right now. -*/ -pub trait Reader { - // FIXME (#2004): Seekable really should be orthogonal. - - // FIXME (#2982): This should probably return an error. - /** - * Reads bytes and puts them into `bytes`, advancing the cursor. Returns the - * number of bytes read. - * - * The number of bytes to be read is `len` or the end of the file, - * whichever comes first. - * - * The buffer must be at least `len` bytes long. - * - * `read` is conceptually similar to C's `fread` function. - * - * # Examples - * - * None right now. - */ - fn read(&self, bytes: &mut [u8], len: uint) -> uint; - - /** - * Reads a single byte, advancing the cursor. - * - * In the case of an EOF or an error, returns a negative value. - * - * `read_byte` is conceptually similar to C's `getc` function. - * - * # Examples - * - * None right now. - */ - fn read_byte(&self) -> int; - - /** - * Returns a boolean value: are we currently at EOF? - * - * Note that stream position may be already at the end-of-file point, - * but `eof` returns false until an attempt to read at that position. - * - * `eof` is conceptually similar to C's `feof` function. - * - * # Examples - * - * None right now. - */ - fn eof(&self) -> bool; - - /** - * Seek to a given `position` in the stream. - * - * Takes an optional SeekStyle, which affects how we seek from the - * position. See `SeekStyle` docs for more details. - * - * `seek` is conceptually similar to C's `fseek` function. - * - * # Examples - * - * None right now. - */ - fn seek(&self, position: int, style: SeekStyle); - - /** - * Returns the current position within the stream. - * - * `tell` is conceptually similar to C's `ftell` function. - * - * # Examples - * - * None right now. - */ - fn tell(&self) -> uint; -} - -impl Reader for @Reader { - fn read(&self, bytes: &mut [u8], len: uint) -> uint { - self.read(bytes, len) - } - fn read_byte(&self) -> int { - self.read_byte() - } - fn eof(&self) -> bool { - self.eof() - } - fn seek(&self, position: int, style: SeekStyle) { - self.seek(position, style) - } - fn tell(&self) -> uint { - self.tell() - } -} - -/** -* The `ReaderUtil` trait is a home for many of the utility functions -* a particular Reader should implement. -* -* The default `Reader` trait is focused entirely on bytes. `ReaderUtil` is based -* on higher-level concepts like 'chars' and 'lines.' -* -* # Examples: -* -* None right now. -*/ -pub trait ReaderUtil { - - /** - * Reads `len` number of bytes, and gives you a new vector back. - * - * # Examples - * - * None right now. - */ - fn read_bytes(&self, len: uint) -> ~[u8]; - - /** - * Reads up until a specific byte is seen or EOF. - * - * The `include` parameter specifies if the character should be included - * in the returned string. - * - * # Examples - * - * None right now. - */ - fn read_until(&self, c: u8, include: bool) -> ~str; - - /** - * Reads up until the first '\n' or EOF. - * - * The '\n' is not included in the result. - * - * # Examples - * - * None right now. - */ - fn read_line(&self) -> ~str; - - /** - * Reads `n` chars. - * - * Assumes that those chars are UTF-8 encoded. - * - * The '\n' is not included in the result. - * - * # Examples - * - * None right now. - */ - fn read_chars(&self, n: uint) -> ~[char]; - - /** - * Reads a single UTF-8 encoded char. - * - * # Examples - * - * None right now. - */ - fn read_char(&self) -> char; - - /** - * Reads up until the first null byte or EOF. - * - * The null byte is not returned. - * - * # Examples - * - * None right now. - */ - fn read_c_str(&self) -> ~str; - - /** - * Reads all remaining data in the stream. - * - * # Examples - * - * None right now. - */ - fn read_whole_stream(&self) -> ~[u8]; - - /** - * Iterate over every byte until EOF or the iterator breaks. - * - * # Examples - * - * None right now. - */ - fn each_byte(&self, it: &fn(int) -> bool) -> bool; - - /** - * Iterate over every char until EOF or the iterator breaks. - * - * # Examples - * - * None right now. - */ - fn each_char(&self, it: &fn(char) -> bool) -> bool; - - /** - * Iterate over every line until EOF or the iterator breaks. - * - * # Examples - * - * None right now. - */ - fn each_line(&self, it: &fn(&str) -> bool) -> bool; - - /** - * Reads all of the lines in the stream. - * - * Returns a vector of those lines. - * - * # Examples - * - * None right now. - */ - fn read_lines(&self) -> ~[~str]; - - /** - * Reads `n` little-endian unsigned integer bytes. - * - * `n` must be between 1 and 8, inclusive. - * - * # Examples - * - * None right now. - */ - fn read_le_uint_n(&self, nbytes: uint) -> u64; - - /** - * Reads `n` little-endian signed integer bytes. - * - * `n` must be between 1 and 8, inclusive. - * - * # Examples - * - * None right now. - */ - fn read_le_int_n(&self, nbytes: uint) -> i64; - - /** - * Reads `n` big-endian unsigned integer bytes. - * - * `n` must be between 1 and 8, inclusive. - * - * # Examples - * - * None right now. - */ - fn read_be_uint_n(&self, nbytes: uint) -> u64; - - /** - * Reads `n` big-endian signed integer bytes. - * - * `n` must be between 1 and 8, inclusive. - * - * # Examples - * - * None right now. - */ - fn read_be_int_n(&self, nbytes: uint) -> i64; - - /** - * Reads a little-endian unsigned integer. - * - * The number of bytes returned is system-dependant. - * - * # Examples - * - * None right now. - */ - fn read_le_uint(&self) -> uint; - - /** - * Reads a little-endian integer. - * - * The number of bytes returned is system-dependant. - * - * # Examples - * - * None right now. - */ - fn read_le_int(&self) -> int; - - /** - * Reads a big-endian unsigned integer. - * - * The number of bytes returned is system-dependant. - * - * # Examples - * - * None right now. - */ - fn read_be_uint(&self) -> uint; - - /** - * Reads a big-endian integer. - * - * The number of bytes returned is system-dependant. - * - * # Examples - * - * None right now. - */ - fn read_be_int(&self) -> int; - - /** - * Reads a big-endian `u64`. - * - * `u64`s are 8 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_u64(&self) -> u64; - - /** - * Reads a big-endian `u32`. - * - * `u32`s are 4 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_u32(&self) -> u32; - - /** - * Reads a big-endian `u16`. - * - * `u16`s are 2 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_u16(&self) -> u16; - - /** - * Reads a big-endian `i64`. - * - * `i64`s are 8 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_i64(&self) -> i64; - - /** - * Reads a big-endian `i32`. - * - * `i32`s are 4 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_i32(&self) -> i32; - - /** - * Reads a big-endian `i16`. - * - * `i16`s are 2 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_be_i16(&self) -> i16; - - /** - * Reads a big-endian `f64`. - * - * `f64`s are 8 byte, IEEE754 double-precision floating point numbers. - * - * # Examples - * - * None right now. - */ - fn read_be_f64(&self) -> f64; - - /** - * Reads a big-endian `f32`. - * - * `f32`s are 4 byte, IEEE754 single-precision floating point numbers. - * - * # Examples - * - * None right now. - */ - fn read_be_f32(&self) -> f32; - - /** - * Reads a little-endian `u64`. - * - * `u64`s are 8 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_u64(&self) -> u64; - - /** - * Reads a little-endian `u32`. - * - * `u32`s are 4 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_u32(&self) -> u32; - - /** - * Reads a little-endian `u16`. - * - * `u16`s are 2 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_u16(&self) -> u16; - - /** - * Reads a little-endian `i64`. - * - * `i64`s are 8 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_i64(&self) -> i64; - - /** - * Reads a little-endian `i32`. - * - * `i32`s are 4 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_i32(&self) -> i32; - - /** - * Reads a little-endian `i16`. - * - * `i16`s are 2 bytes long. - * - * # Examples - * - * None right now. - */ - fn read_le_i16(&self) -> i16; - - /** - * Reads a little-endian `f64`. - * - * `f64`s are 8 byte, IEEE754 double-precision floating point numbers. - * - * # Examples - * - * None right now. - */ - fn read_le_f64(&self) -> f64; - - /** - * Reads a little-endian `f32`. - * - * `f32`s are 4 byte, IEEE754 single-precision floating point numbers. - * - * # Examples - * - * None right now. - */ - fn read_le_f32(&self) -> f32; - - /** - * Read a u8. - * - * `u8`s are 1 byte. - * - * # Examples - * - * None right now. - */ - fn read_u8(&self) -> u8; - - /** - * Read an i8. - * - * `i8`s are 1 byte. - * - * # Examples - * - * None right now. - */ - fn read_i8(&self) -> i8; -} - -impl<T:Reader> ReaderUtil for T { - - fn read_bytes(&self, len: uint) -> ~[u8] { - let mut bytes = vec::with_capacity(len); - unsafe { vec::raw::set_len(&mut bytes, len); } - - let count = self.read(bytes, len); - - unsafe { vec::raw::set_len(&mut bytes, count); } - bytes - } - - fn read_until(&self, c: u8, include: bool) -> ~str { - let mut bytes = ~[]; - loop { - let ch = self.read_byte(); - if ch == -1 || ch == c as int { - if include && ch == c as int { - bytes.push(ch as u8); - } - break; - } - bytes.push(ch as u8); - } - str::from_utf8(bytes) - } - - fn read_line(&self) -> ~str { - self.read_until('\n' as u8, false) - } - - fn read_chars(&self, n: uint) -> ~[char] { - // returns the (consumed offset, n_req), appends characters to &chars - fn chars_from_utf8<T:Reader>(bytes: &~[u8], chars: &mut ~[char]) - -> (uint, uint) { - let mut i = 0; - let bytes_len = bytes.len(); - while i < bytes_len { - let b0 = bytes[i]; - let w = str::utf8_char_width(b0); - let end = i + w; - i += 1; - assert!((w > 0)); - if w == 1 { - unsafe { - chars.push(transmute(b0 as u32)); - } - continue; - } - // can't satisfy this char with the existing data - if end > bytes_len { - return (i - 1, end - bytes_len); - } - let mut val = 0; - while i < end { - let next = bytes[i] as int; - i += 1; - assert!((next > -1)); - assert_eq!(next & 192, 128); - val <<= 6; - val += (next & 63) as uint; - } - // See str::StrSlice::char_at - val += ((b0 << ((w + 1) as u8)) as uint) - << (w - 1) * 6 - w - 1u; - unsafe { - chars.push(transmute(val as u32)); - } - } - return (i, 0); - } - let mut bytes = ~[]; - let mut chars = ~[]; - // might need more bytes, but reading n will never over-read - let mut nbread = n; - while nbread > 0 { - let data = self.read_bytes(nbread); - if data.is_empty() { - // eof - FIXME (#2004): should we do something if - // we're split in a unicode char? - break; - } - bytes.push_all(data); - let (offset, nbreq) = chars_from_utf8::<T>(&bytes, &mut chars); - let ncreq = n - chars.len(); - // again we either know we need a certain number of bytes - // to complete a character, or we make sure we don't - // over-read by reading 1-byte per char needed - nbread = if ncreq > nbreq { ncreq } else { nbreq }; - if nbread > 0 { - bytes = bytes.slice(offset, bytes.len()).to_owned(); - } - } - chars - } - - fn read_char(&self) -> char { - let c = self.read_chars(1); - if c.len() == 0 { - return unsafe { transmute(-1u32) }; // FIXME: #8971: unsound - } - assert_eq!(c.len(), 1); - return c[0]; - } - - fn read_c_str(&self) -> ~str { - self.read_until(0u8, false) - } - - fn read_whole_stream(&self) -> ~[u8] { - let mut bytes: ~[u8] = ~[]; - while !self.eof() { bytes.push_all(self.read_bytes(2048u)); } - bytes - } - - fn each_byte(&self, it: &fn(int) -> bool) -> bool { - loop { - match self.read_byte() { - -1 => break, - ch => if !it(ch) { return false; } - } - } - return true; - } - - fn each_char(&self, it: &fn(char) -> bool) -> bool { - // FIXME: #8971: unsound - let eof: char = unsafe { transmute(-1u32) }; - loop { - match self.read_char() { - c if c == eof => break, - ch => if !it(ch) { return false; } - } - } - return true; - } - - fn each_line(&self, it: &fn(s: &str) -> bool) -> bool { - while !self.eof() { - // include the \n, so that we can distinguish an entirely empty - // line read after "...\n", and the trailing empty line in - // "...\n\n". - let mut line = self.read_until('\n' as u8, true); - - // blank line at the end of the reader is ignored - if self.eof() && line.is_empty() { break; } - - // trim the \n, so that each_line is consistent with read_line - let n = line.len(); - if line[n-1] == '\n' as u8 { - unsafe { str::raw::set_len(&mut line, n-1); } - } - - if !it(line) { return false; } - } - return true; - } - - fn read_lines(&self) -> ~[~str] { - do vec::build(None) |push| { - do self.each_line |line| { - push(line.to_owned()); - true - }; - } - } - - // FIXME int reading methods need to deal with eof - issue #2004 - - fn read_le_uint_n(&self, nbytes: uint) -> u64 { - assert!(nbytes > 0 && nbytes <= 8); - - let mut val = 0u64; - let mut pos = 0; - let mut i = nbytes; - while i > 0 { - val += (self.read_u8() as u64) << pos; - pos += 8; - i -= 1; - } - val - } - - fn read_le_int_n(&self, nbytes: uint) -> i64 { - extend_sign(self.read_le_uint_n(nbytes), nbytes) - } - - fn read_be_uint_n(&self, nbytes: uint) -> u64 { - assert!(nbytes > 0 && nbytes <= 8); - - let mut val = 0u64; - let mut i = nbytes; - while i > 0 { - i -= 1; - val += (self.read_u8() as u64) << i * 8; - } - val - } - - fn read_be_int_n(&self, nbytes: uint) -> i64 { - extend_sign(self.read_be_uint_n(nbytes), nbytes) - } - - fn read_le_uint(&self) -> uint { - self.read_le_uint_n(uint::bytes) as uint - } - - fn read_le_int(&self) -> int { - self.read_le_int_n(int::bytes) as int - } - - fn read_be_uint(&self) -> uint { - self.read_be_uint_n(uint::bytes) as uint - } - - fn read_be_int(&self) -> int { - self.read_be_int_n(int::bytes) as int - } - - fn read_be_u64(&self) -> u64 { - self.read_be_uint_n(8) as u64 - } - - fn read_be_u32(&self) -> u32 { - self.read_be_uint_n(4) as u32 - } - - fn read_be_u16(&self) -> u16 { - self.read_be_uint_n(2) as u16 - } - - fn read_be_i64(&self) -> i64 { - self.read_be_int_n(8) as i64 - } - - fn read_be_i32(&self) -> i32 { - self.read_be_int_n(4) as i32 - } - - fn read_be_i16(&self) -> i16 { - self.read_be_int_n(2) as i16 - } - - fn read_be_f64(&self) -> f64 { - unsafe { - cast::transmute::<u64, f64>(self.read_be_u64()) - } - } - - fn read_be_f32(&self) -> f32 { - unsafe { - cast::transmute::<u32, f32>(self.read_be_u32()) - } - } - - fn read_le_u64(&self) -> u64 { - self.read_le_uint_n(8) as u64 - } - - fn read_le_u32(&self) -> u32 { - self.read_le_uint_n(4) as u32 - } - - fn read_le_u16(&self) -> u16 { - self.read_le_uint_n(2) as u16 - } - - fn read_le_i64(&self) -> i64 { - self.read_le_int_n(8) as i64 - } - - fn read_le_i32(&self) -> i32 { - self.read_le_int_n(4) as i32 - } - - fn read_le_i16(&self) -> i16 { - self.read_le_int_n(2) as i16 - } - - fn read_le_f64(&self) -> f64 { - unsafe { - cast::transmute::<u64, f64>(self.read_le_u64()) - } - } - - fn read_le_f32(&self) -> f32 { - unsafe { - cast::transmute::<u32, f32>(self.read_le_u32()) - } - } - - fn read_u8(&self) -> u8 { - self.read_byte() as u8 - } - - fn read_i8(&self) -> i8 { - self.read_byte() as i8 - } -} - -fn extend_sign(val: u64, nbytes: uint) -> i64 { - let shift = (8 - nbytes) * 8; - (val << shift) as i64 >> shift -} - -// Reader implementations - -fn convert_whence(whence: SeekStyle) -> i32 { - return match whence { - SeekSet => 0i32, - SeekCur => 1i32, - SeekEnd => 2i32 - }; -} - -impl Reader for *libc::FILE { - fn read(&self, bytes: &mut [u8], len: uint) -> uint { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - do bytes.as_mut_buf |buf_p, buf_len| { - assert!(buf_len >= len); - - let count = libc::fread(buf_p as *mut c_void, 1u as size_t, - len as size_t, *self) as uint; - if count < len { - match libc::ferror(*self) { - 0 => (), - _ => { - error!("error reading buffer: {}", os::last_os_error()); - fail!(); - } - } - } - - count - } - } - } - fn read_byte(&self) -> int { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - libc::fgetc(*self) as int - } - } - fn eof(&self) -> bool { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - return libc::feof(*self) != 0 as c_int; - } - } - fn seek(&self, offset: int, whence: SeekStyle) { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - assert!(libc::fseek(*self, - offset as libc::c_long, - convert_whence(whence)) == 0 as c_int); - } - } - fn tell(&self) -> uint { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - return libc::ftell(*self) as uint; - } - } -} - -struct Wrapper<T, C> { - base: T, - cleanup: C, -} - -// A forwarding impl of reader that also holds on to a resource for the -// duration of its lifetime. -// FIXME there really should be a better way to do this // #2004 -impl<R:Reader,C> Reader for Wrapper<R, C> { - fn read(&self, bytes: &mut [u8], len: uint) -> uint { - self.base.read(bytes, len) - } - fn read_byte(&self) -> int { self.base.read_byte() } - fn eof(&self) -> bool { self.base.eof() } - fn seek(&self, off: int, whence: SeekStyle) { - self.base.seek(off, whence) - } - fn tell(&self) -> uint { self.base.tell() } -} - -pub struct FILERes { - priv f: *libc::FILE, -} - -impl FILERes { - pub fn new(f: *libc::FILE) -> FILERes { - FILERes { f: f } - } -} - -impl Drop for FILERes { - fn drop(&mut self) { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - libc::fclose(self.f); - } - } -} - -pub fn FILE_reader(f: *libc::FILE, cleanup: bool) -> @Reader { - if cleanup { - @Wrapper { base: f, cleanup: FILERes::new(f) } as @Reader - } else { - @f as @Reader - } -} - -// FIXME (#2004): this should either be an trait-less impl, a set of -// top-level functions that take a reader, or a set of default methods on -// reader (which can then be called reader) - -/** -* Gives a `Reader` that allows you to read values from standard input. -* -* # Example -* -* ```rust -* let stdin = std::io::stdin(); -* let line = stdin.read_line(); -* std::io::print(line); -* ``` -*/ -pub fn stdin() -> @Reader { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - @rustrt::rust_get_stdin() as @Reader - } -} - -pub fn file_reader(path: &Path) -> Result<@Reader, ~str> { - #[fixed_stack_segment]; #[inline(never)]; - - let f = do path.with_c_str |pathbuf| { - do "rb".with_c_str |modebuf| { - unsafe { libc::fopen(pathbuf, modebuf as *libc::c_char) } - } - }; - - if f as uint == 0u { - do path.display().with_str |p| { - Err(~"error opening " + p) - } - } else { - Ok(FILE_reader(f, true)) - } -} - - -// Byte readers -pub struct BytesReader { - // FIXME(#5723) see other FIXME below - // FIXME(#7268) this should also be parameterized over <'self> - bytes: &'static [u8], - pos: @mut uint -} - -impl Reader for BytesReader { - fn read(&self, bytes: &mut [u8], len: uint) -> uint { - let count = num::min(len, self.bytes.len() - *self.pos); - - let view = self.bytes.slice(*self.pos, self.bytes.len()); - vec::bytes::copy_memory(bytes, view, count); - - *self.pos += count; - - count - } - - fn read_byte(&self) -> int { - if *self.pos == self.bytes.len() { - return -1; - } - - let b = self.bytes[*self.pos]; - *self.pos += 1u; - b as int - } - - fn eof(&self) -> bool { - *self.pos == self.bytes.len() - } - - fn seek(&self, offset: int, whence: SeekStyle) { - let pos = *self.pos; - *self.pos = seek_in_buf(offset, pos, self.bytes.len(), whence); - } - - fn tell(&self) -> uint { - *self.pos - } -} - -pub fn with_bytes_reader<T>(bytes: &[u8], f: &fn(@Reader) -> T) -> T { - // XXX XXX XXX this is glaringly unsound - // FIXME(#5723) Use a &Reader for the callback's argument. Should be: - // fn with_bytes_reader<'r, T>(bytes: &'r [u8], f: &fn(&'r Reader) -> T) -> T - let bytes: &'static [u8] = unsafe { cast::transmute(bytes) }; - f(@BytesReader { - bytes: bytes, - pos: @mut 0 - } as @Reader) -} - -pub fn with_str_reader<T>(s: &str, f: &fn(@Reader) -> T) -> T { - // FIXME(#5723): As above. - with_bytes_reader(s.as_bytes(), f) -} - -// Writing -pub enum FileFlag { Append, Create, Truncate, NoFlag, } - -// What type of writer are we? -#[deriving(Eq)] -pub enum WriterType { Screen, File } - -// FIXME (#2004): Seekable really should be orthogonal. -// FIXME (#2004): eventually u64 -/// The raw underlying writer trait. All writers must implement this. -pub trait Writer { - - /// Write all of the given bytes. - fn write(&self, v: &[u8]); - - /// Move the current position within the stream. The second parameter - /// determines the position that the first parameter is relative to. - fn seek(&self, int, SeekStyle); - - /// Return the current position within the stream. - fn tell(&self) -> uint; - - /// Flush the output buffer for this stream (if there is one). - fn flush(&self) -> int; - - /// Determine if this Writer is writing to a file or not. - fn get_type(&self) -> WriterType; -} - -impl Writer for @Writer { - fn write(&self, v: &[u8]) { self.write(v) } - fn seek(&self, a: int, b: SeekStyle) { self.seek(a, b) } - fn tell(&self) -> uint { self.tell() } - fn flush(&self) -> int { self.flush() } - fn get_type(&self) -> WriterType { self.get_type() } -} - -impl<W:Writer,C> Writer for Wrapper<W, C> { - fn write(&self, bs: &[u8]) { self.base.write(bs); } - fn seek(&self, off: int, style: SeekStyle) { self.base.seek(off, style); } - fn tell(&self) -> uint { self.base.tell() } - fn flush(&self) -> int { self.base.flush() } - fn get_type(&self) -> WriterType { File } -} - -impl Writer for *libc::FILE { - fn write(&self, v: &[u8]) { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - do v.as_imm_buf |vbuf, len| { - let nout = libc::fwrite(vbuf as *c_void, - 1, - len as size_t, - *self); - if nout != len as size_t { - error!("error writing buffer: {}", os::last_os_error()); - fail!(); - } - } - } - } - fn seek(&self, offset: int, whence: SeekStyle) { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - assert!(libc::fseek(*self, - offset as libc::c_long, - convert_whence(whence)) == 0 as c_int); - } - } - fn tell(&self) -> uint { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - libc::ftell(*self) as uint - } - } - fn flush(&self) -> int { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - libc::fflush(*self) as int - } - } - fn get_type(&self) -> WriterType { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - let fd = libc::fileno(*self); - if libc::isatty(fd) == 0 { File } - else { Screen } - } - } -} - -impl Writer for fd_t { - fn write(&self, v: &[u8]) { - #[fixed_stack_segment]; #[inline(never)]; - - #[cfg(windows)] - type IoSize = libc::c_uint; - #[cfg(windows)] - type IoRet = c_int; - - #[cfg(unix)] - type IoSize = size_t; - #[cfg(unix)] - type IoRet = libc::ssize_t; - - unsafe { - let mut count = 0u; - do v.as_imm_buf |vbuf, len| { - while count < len { - let vb = ptr::offset(vbuf, count as int) as *c_void; - let nout = libc::write(*self, vb, len as IoSize); - if nout < 0 as IoRet { - error!("error writing buffer: {}", os::last_os_error()); - fail!(); - } - count += nout as uint; - } - } - } - } - fn seek(&self, _offset: int, _whence: SeekStyle) { - error!("need 64-bit foreign calls for seek, sorry"); - fail!(); - } - fn tell(&self) -> uint { - error!("need 64-bit foreign calls for tell, sorry"); - fail!(); - } - fn flush(&self) -> int { 0 } - fn get_type(&self) -> WriterType { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - if libc::isatty(*self) == 0 { File } else { Screen } - } - } -} - -pub struct FdRes { - priv fd: fd_t, -} - -impl FdRes { - pub fn new(fd: fd_t) -> FdRes { - FdRes { fd: fd } - } -} - -impl Drop for FdRes { - fn drop(&mut self) { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - libc::close(self.fd); - } - } -} - -pub fn fd_writer(fd: fd_t, cleanup: bool) -> @Writer { - if cleanup { - @Wrapper { base: fd, cleanup: FdRes::new(fd) } as @Writer - } else { - @fd as @Writer - } -} - - -pub fn mk_file_writer(path: &Path, flags: &[FileFlag]) - -> Result<@Writer, ~str> { - #[fixed_stack_segment]; #[inline(never)]; - - #[cfg(windows)] - fn wb() -> c_int { - (O_WRONLY | libc::consts::os::extra::O_BINARY) as c_int - } - - #[cfg(unix)] - fn wb() -> c_int { O_WRONLY as c_int } - - let mut fflags: c_int = wb(); - for f in flags.iter() { - match *f { - Append => fflags |= O_APPEND as c_int, - Create => fflags |= O_CREAT as c_int, - Truncate => fflags |= O_TRUNC as c_int, - NoFlag => () - } - } - let fd = unsafe { - do path.with_c_str |pathbuf| { - libc::open(pathbuf, fflags, (S_IRUSR | S_IWUSR) as c_int) - } - }; - if fd < (0 as c_int) { - Err(format!("error opening {}: {}", path.display(), os::last_os_error())) - } else { - Ok(fd_writer(fd, true)) - } -} - -pub fn u64_to_le_bytes<T>(n: u64, size: uint, - f: &fn(v: &[u8]) -> T) -> T { - assert!(size <= 8u); - match size { - 1u => f(&[n as u8]), - 2u => f(&[n as u8, - (n >> 8) as u8]), - 4u => f(&[n as u8, - (n >> 8) as u8, - (n >> 16) as u8, - (n >> 24) as u8]), - 8u => f(&[n as u8, - (n >> 8) as u8, - (n >> 16) as u8, - (n >> 24) as u8, - (n >> 32) as u8, - (n >> 40) as u8, - (n >> 48) as u8, - (n >> 56) as u8]), - _ => { - - let mut bytes: ~[u8] = ~[]; - let mut i = size; - let mut n = n; - while i > 0u { - bytes.push((n & 255_u64) as u8); - n >>= 8_u64; - i -= 1u; - } - f(bytes) - } - } -} - -pub fn u64_to_be_bytes<T>(n: u64, size: uint, - f: &fn(v: &[u8]) -> T) -> T { - assert!(size <= 8u); - match size { - 1u => f(&[n as u8]), - 2u => f(&[(n >> 8) as u8, - n as u8]), - 4u => f(&[(n >> 24) as u8, - (n >> 16) as u8, - (n >> 8) as u8, - n as u8]), - 8u => f(&[(n >> 56) as u8, - (n >> 48) as u8, - (n >> 40) as u8, - (n >> 32) as u8, - (n >> 24) as u8, - (n >> 16) as u8, - (n >> 8) as u8, - n as u8]), - _ => { - let mut bytes: ~[u8] = ~[]; - let mut i = size; - while i > 0u { - let shift = ((i - 1u) * 8u) as u64; - bytes.push((n >> shift) as u8); - i -= 1u; - } - f(bytes) - } - } -} - -pub fn u64_from_be_bytes(data: &[u8], - start: uint, - size: uint) - -> u64 { - let mut sz = size; - assert!((sz <= 8u)); - let mut val = 0_u64; - let mut pos = start; - while sz > 0u { - sz -= 1u; - val += (data[pos] as u64) << ((sz * 8u) as u64); - pos += 1u; - } - return val; -} - -// FIXME: #3048 combine trait+impl (or just move these to -// default methods on writer) -/// Generic utility functions defined on writers. -pub trait WriterUtil { - - /// Write a single utf-8 encoded char. - fn write_char(&self, ch: char); - - /// Write every char in the given str, encoded as utf-8. - fn write_str(&self, s: &str); - - /// Write the given str, as utf-8, followed by '\n'. - fn write_line(&self, s: &str); - - /// Write the result of passing n through `int::to_str_bytes`. - fn write_int(&self, n: int); - - /// Write the result of passing n through `uint::to_str_bytes`. - fn write_uint(&self, n: uint); - - /// Write a little-endian uint (number of bytes depends on system). - fn write_le_uint(&self, n: uint); - - /// Write a little-endian int (number of bytes depends on system). - fn write_le_int(&self, n: int); - - /// Write a big-endian uint (number of bytes depends on system). - fn write_be_uint(&self, n: uint); - - /// Write a big-endian int (number of bytes depends on system). - fn write_be_int(&self, n: int); - - /// Write a big-endian u64 (8 bytes). - fn write_be_u64(&self, n: u64); - - /// Write a big-endian u32 (4 bytes). - fn write_be_u32(&self, n: u32); - - /// Write a big-endian u16 (2 bytes). - fn write_be_u16(&self, n: u16); - - /// Write a big-endian i64 (8 bytes). - fn write_be_i64(&self, n: i64); - - /// Write a big-endian i32 (4 bytes). - fn write_be_i32(&self, n: i32); - - /// Write a big-endian i16 (2 bytes). - fn write_be_i16(&self, n: i16); - - /// Write a big-endian IEEE754 double-precision floating-point (8 bytes). - fn write_be_f64(&self, f: f64); - - /// Write a big-endian IEEE754 single-precision floating-point (4 bytes). - fn write_be_f32(&self, f: f32); - - /// Write a little-endian u64 (8 bytes). - fn write_le_u64(&self, n: u64); - - /// Write a little-endian u32 (4 bytes). - fn write_le_u32(&self, n: u32); - - /// Write a little-endian u16 (2 bytes). - fn write_le_u16(&self, n: u16); - - /// Write a little-endian i64 (8 bytes). - fn write_le_i64(&self, n: i64); - - /// Write a little-endian i32 (4 bytes). - fn write_le_i32(&self, n: i32); - - /// Write a little-endian i16 (2 bytes). - fn write_le_i16(&self, n: i16); - - /// Write a little-endian IEEE754 double-precision floating-point - /// (8 bytes). - fn write_le_f64(&self, f: f64); - - /// Write a little-endian IEEE754 single-precision floating-point - /// (4 bytes). - fn write_le_f32(&self, f: f32); - - /// Write a u8 (1 byte). - fn write_u8(&self, n: u8); - - /// Write a i8 (1 byte). - fn write_i8(&self, n: i8); -} - -impl<T:Writer> WriterUtil for T { - fn write_char(&self, ch: char) { - if (ch as uint) < 128u { - self.write(&[ch as u8]); - } else { - self.write_str(str::from_char(ch)); - } - } - fn write_str(&self, s: &str) { self.write(s.as_bytes()) } - fn write_line(&self, s: &str) { - self.write_str(s); - self.write_str(&"\n"); - } - fn write_int(&self, n: int) { - int::to_str_bytes(n, 10u, |bytes| self.write(bytes)) - } - fn write_uint(&self, n: uint) { - uint::to_str_bytes(n, 10u, |bytes| self.write(bytes)) - } - fn write_le_uint(&self, n: uint) { - u64_to_le_bytes(n as u64, uint::bytes, |v| self.write(v)) - } - fn write_le_int(&self, n: int) { - u64_to_le_bytes(n as u64, int::bytes, |v| self.write(v)) - } - fn write_be_uint(&self, n: uint) { - u64_to_be_bytes(n as u64, uint::bytes, |v| self.write(v)) - } - fn write_be_int(&self, n: int) { - u64_to_be_bytes(n as u64, int::bytes, |v| self.write(v)) - } - fn write_be_u64(&self, n: u64) { - u64_to_be_bytes(n, 8u, |v| self.write(v)) - } - fn write_be_u32(&self, n: u32) { - u64_to_be_bytes(n as u64, 4u, |v| self.write(v)) - } - fn write_be_u16(&self, n: u16) { - u64_to_be_bytes(n as u64, 2u, |v| self.write(v)) - } - fn write_be_i64(&self, n: i64) { - u64_to_be_bytes(n as u64, 8u, |v| self.write(v)) - } - fn write_be_i32(&self, n: i32) { - u64_to_be_bytes(n as u64, 4u, |v| self.write(v)) - } - fn write_be_i16(&self, n: i16) { - u64_to_be_bytes(n as u64, 2u, |v| self.write(v)) - } - fn write_be_f64(&self, f:f64) { - unsafe { - self.write_be_u64(cast::transmute(f)) - } - } - fn write_be_f32(&self, f:f32) { - unsafe { - self.write_be_u32(cast::transmute(f)) - } - } - fn write_le_u64(&self, n: u64) { - u64_to_le_bytes(n, 8u, |v| self.write(v)) - } - fn write_le_u32(&self, n: u32) { - u64_to_le_bytes(n as u64, 4u, |v| self.write(v)) - } - fn write_le_u16(&self, n: u16) { - u64_to_le_bytes(n as u64, 2u, |v| self.write(v)) - } - fn write_le_i64(&self, n: i64) { - u64_to_le_bytes(n as u64, 8u, |v| self.write(v)) - } - fn write_le_i32(&self, n: i32) { - u64_to_le_bytes(n as u64, 4u, |v| self.write(v)) - } - fn write_le_i16(&self, n: i16) { - u64_to_le_bytes(n as u64, 2u, |v| self.write(v)) - } - fn write_le_f64(&self, f:f64) { - unsafe { - self.write_le_u64(cast::transmute(f)) - } - } - fn write_le_f32(&self, f:f32) { - unsafe { - self.write_le_u32(cast::transmute(f)) - } - } - - fn write_u8(&self, n: u8) { self.write([n]) } - fn write_i8(&self, n: i8) { self.write([n as u8]) } - -} - -pub fn file_writer(path: &Path, flags: &[FileFlag]) -> Result<@Writer, ~str> { - mk_file_writer(path, flags).and_then(|w| Ok(w)) -} - -// FIXME (#2004) it would be great if this could be a const -// FIXME (#2004) why are these different from the way stdin() is -// implemented? - - -/** -* Gives a `Writer` which allows you to write to the standard output. -* -* # Example -* -* ```rust -* let stdout = std::io::stdout(); -* stdout.write_str("hello\n"); -* ``` -*/ -pub fn stdout() -> @Writer { fd_writer(libc::STDOUT_FILENO as c_int, false) } - -/** -* Gives a `Writer` which allows you to write to standard error. -* -* # Example -* -* ```rust -* let stderr = std::io::stderr(); -* stderr.write_str("hello\n"); -* ``` -*/ -pub fn stderr() -> @Writer { fd_writer(libc::STDERR_FILENO as c_int, false) } - -/** -* Prints a string to standard output. -* -* This string will not have an implicit newline at the end. If you want -* an implicit newline, please see `println`. -* -* # Example -* -* ```rust -* // print is imported into the prelude, and so is always available. -* print("hello"); -* ``` -*/ -pub fn print(s: &str) { - stdout().write_str(s); -} - -/** -* Prints a string to standard output, followed by a newline. -* -* If you do not want an implicit newline, please see `print`. -* -* # Example -* -* ```rust -* // println is imported into the prelude, and so is always available. -* println("hello"); -* ``` -*/ -pub fn println(s: &str) { - stdout().write_line(s); -} - -pub struct BytesWriter { - bytes: @mut ~[u8], - pos: @mut uint, -} - -impl BytesWriter { - pub fn new() -> BytesWriter { - BytesWriter { - bytes: @mut ~[], - pos: @mut 0 - } - } -} - -impl Writer for BytesWriter { - fn write(&self, v: &[u8]) { - let v_len = v.len(); - - let bytes = &mut *self.bytes; - let count = num::max(bytes.len(), *self.pos + v_len); - bytes.reserve(count); - - unsafe { - vec::raw::set_len(bytes, count); - - let view = bytes.mut_slice(*self.pos, count); - vec::bytes::copy_memory(view, v, v_len); - } - - *self.pos += v_len; - } - - fn seek(&self, offset: int, whence: SeekStyle) { - let pos = *self.pos; - let len = self.bytes.len(); - *self.pos = seek_in_buf(offset, pos, len, whence); - } - - fn tell(&self) -> uint { - *self.pos - } - - fn flush(&self) -> int { - 0 - } - - fn get_type(&self) -> WriterType { - File - } -} - -pub fn with_bytes_writer(f: &fn(@Writer)) -> ~[u8] { - let wr = @BytesWriter::new(); - f(wr as @Writer); - let @BytesWriter { bytes, _ } = wr; - (*bytes).clone() -} - -pub fn with_str_writer(f: &fn(@Writer)) -> ~str { - str::from_utf8(with_bytes_writer(f)) -} - -// Utility functions -pub fn seek_in_buf(offset: int, pos: uint, len: uint, whence: SeekStyle) -> - uint { - let mut bpos = pos as int; - let blen = len as int; - match whence { - SeekSet => bpos = offset, - SeekCur => bpos += offset, - SeekEnd => bpos = blen + offset - } - if bpos < 0 { bpos = 0; } else if bpos > blen { bpos = blen; } - return bpos as uint; -} - -pub fn read_whole_file_str(file: &Path) -> Result<~str, ~str> { - do read_whole_file(file).and_then |bytes| { - if str::is_utf8(bytes) { - Ok(str::from_utf8(bytes)) - } else { - Err(file.display().to_str() + " is not UTF-8") - } - } -} - -// FIXME (#2004): implement this in a low-level way. Going through the -// abstractions is pointless. -pub fn read_whole_file(file: &Path) -> Result<~[u8], ~str> { - do file_reader(file).and_then |rdr| { - Ok(rdr.read_whole_stream()) - } -} - -// fsync related - -pub mod fsync { - use io::{FILERes, FdRes, fd_t}; - use libc; - use ops::Drop; - use option::{None, Option, Some}; - use os; - - pub enum Level { - // whatever fsync does on that platform - FSync, - - // fdatasync on linux, similiar or more on other platforms - FDataSync, - - // full fsync - // - // You must additionally sync the parent directory as well! - FullFSync, - } - - - // Artifacts that need to fsync on destruction - pub struct Res<t> { - priv arg: Arg<t>, - } - - impl <t> Res<t> { - pub fn new(arg: Arg<t>) -> Res<t> { - Res { arg: arg } - } - } - - #[unsafe_destructor] - impl<T> Drop for Res<T> { - fn drop(&mut self) { - match self.arg.opt_level { - None => (), - Some(level) => { - // fail hard if not succesful - assert!(((self.arg.fsync_fn)(&self.arg.val, level) != -1)); - } - } - } - } - - pub struct Arg<t> { - priv val: t, - priv opt_level: Option<Level>, - priv fsync_fn: extern "Rust" fn(f: &t, Level) -> int, - } - - // fsync file after executing blk - // FIXME (#2004) find better way to create resources within lifetime of - // outer res - pub fn FILE_res_sync(file: &FILERes, - opt_level: Option<Level>, - blk: &fn(v: Res<*libc::FILE>)) { - blk(Res::new(Arg { - val: file.f, - opt_level: opt_level, - fsync_fn: fsync_FILE, - })); - - fn fileno(stream: *libc::FILE) -> libc::c_int { - #[fixed_stack_segment]; #[inline(never)]; - unsafe { libc::fileno(stream) } - } - - fn fsync_FILE(stream: &*libc::FILE, level: Level) -> int { - fsync_fd(fileno(*stream), level) - } - } - - // fsync fd after executing blk - pub fn fd_res_sync(fd: &FdRes, opt_level: Option<Level>, - blk: &fn(v: Res<fd_t>)) { - blk(Res::new(Arg { - val: fd.fd, - opt_level: opt_level, - fsync_fn: fsync_fd_helper, - })); - } - - fn fsync_fd(fd: libc::c_int, level: Level) -> int { - #[fixed_stack_segment]; #[inline(never)]; - - os::fsync_fd(fd, level) as int - } - - fn fsync_fd_helper(fd_ptr: &libc::c_int, level: Level) -> int { - fsync_fd(*fd_ptr, level) - } - - // Type of objects that may want to fsync - pub trait FSyncable { fn fsync(&self, l: Level) -> int; } - - // Call o.fsync after executing blk - pub fn obj_sync(o: @FSyncable, opt_level: Option<Level>, - blk: &fn(v: Res<@FSyncable>)) { - blk(Res::new(Arg { - val: o, - opt_level: opt_level, - fsync_fn: obj_fsync_fn, - })); - } - - fn obj_fsync_fn(o: &@FSyncable, level: Level) -> int { - (*o).fsync(level) - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use i32; - use io::{BytesWriter, SeekCur, SeekEnd, SeekSet}; - use io; - use path::Path; - use result::{Ok, Err}; - use u64; - use vec; - use cast::transmute; - - #[test] - fn test_simple() { - let tmpfile = &Path::new("tmp/lib-io-test-simple.tmp"); - debug!("{}", tmpfile.display()); - let frood: ~str = - ~"A hoopy frood who really knows where his towel is."; - debug!("{}", frood.clone()); - { - let out = io::file_writer(tmpfile, [io::Create, io::Truncate]).unwrap(); - out.write_str(frood); - } - let inp = io::file_reader(tmpfile).unwrap(); - let frood2: ~str = inp.read_c_str(); - debug!("{}", frood2.clone()); - assert_eq!(frood, frood2); - } - - #[test] - fn test_each_byte_each_char_file() { - // Issue #5056 -- shouldn't include trailing EOF. - let path = Path::new("tmp/lib-io-test-each-byte-each-char-file.tmp"); - - { - // create empty, enough to reproduce a problem - io::file_writer(&path, [io::Create]).unwrap(); - } - - { - let file = io::file_reader(&path).unwrap(); - do file.each_byte() |_| { - fail!("must be empty") - }; - } - - { - let file = io::file_reader(&path).unwrap(); - do file.each_char() |_| { - fail!("must be empty") - }; - } - } - - #[test] - fn test_readchars_empty() { - do io::with_str_reader("") |inp| { - let res : ~[char] = inp.read_chars(128); - assert_eq!(res.len(), 0); - } - } - - #[test] - fn test_read_line_utf8() { - do io::with_str_reader("生锈的汤匙切肉汤hello生锈的汤匙切肉汤") |inp| { - let line = inp.read_line(); - assert_eq!(line, ~"生锈的汤匙切肉汤hello生锈的汤匙切肉汤"); - } - } - - #[test] - fn test_read_lines() { - do io::with_str_reader("a\nb\nc\n") |inp| { - assert_eq!(inp.read_lines(), ~[~"a", ~"b", ~"c"]); - } - - do io::with_str_reader("a\nb\nc") |inp| { - assert_eq!(inp.read_lines(), ~[~"a", ~"b", ~"c"]); - } - - do io::with_str_reader("") |inp| { - assert!(inp.read_lines().is_empty()); - } - } - - #[test] - fn test_readchars_wide() { - let wide_test = ~"生锈的汤匙切肉汤hello生锈的汤匙切肉汤"; - let ivals : ~[int] = ~[ - 29983, 38152, 30340, 27748, - 21273, 20999, 32905, 27748, - 104, 101, 108, 108, 111, - 29983, 38152, 30340, 27748, - 21273, 20999, 32905, 27748]; - fn check_read_ln(len : uint, s: &str, ivals: &[int]) { - do io::with_str_reader(s) |inp| { - let res : ~[char] = inp.read_chars(len); - if len <= ivals.len() { - assert_eq!(res.len(), len); - } - for (iv, c) in ivals.iter().zip(res.iter()) { - assert!(*iv == *c as int) - } - } - } - let mut i = 0; - while i < 8 { - check_read_ln(i, wide_test, ivals); - i += 1; - } - // check a long read for good measure - check_read_ln(128, wide_test, ivals); - } - - #[test] - fn test_readchar() { - do io::with_str_reader("生") |inp| { - let res = inp.read_char(); - assert_eq!(res as int, 29983); - } - } - - #[test] - fn test_readchar_empty() { - do io::with_str_reader("") |inp| { - let res = inp.read_char(); - assert_eq!(res, unsafe { transmute(-1u32) }); // FIXME: #8971: unsound - } - } - - #[test] - fn file_reader_not_exist() { - match io::file_reader(&Path::new("not a file")) { - Err(e) => { - assert_eq!(e, ~"error opening not a file"); - } - Ok(_) => fail!() - } - } - - #[test] - #[should_fail] - fn test_read_buffer_too_small() { - let path = &Path::new("tmp/lib-io-test-read-buffer-too-small.tmp"); - // ensure the file exists - io::file_writer(path, [io::Create]).unwrap(); - - let file = io::file_reader(path).unwrap(); - let mut buf = vec::from_elem(5, 0u8); - file.read(buf, 6); // this should fail because buf is too small - } - - #[test] - fn test_read_buffer_big_enough() { - let path = &Path::new("tmp/lib-io-test-read-buffer-big-enough.tmp"); - // ensure the file exists - io::file_writer(path, [io::Create]).unwrap(); - - let file = io::file_reader(path).unwrap(); - let mut buf = vec::from_elem(5, 0u8); - file.read(buf, 4); // this should succeed because buf is big enough - } - - #[test] - fn test_write_empty() { - let file = io::file_writer(&Path::new("tmp/lib-io-test-write-empty.tmp"), - [io::Create]).unwrap(); - file.write([]); - } - - #[test] - fn file_writer_bad_name() { - match io::file_writer(&Path::new("?/?"), []) { - Err(e) => { - assert!(e.starts_with("error opening")); - } - Ok(_) => fail!() - } - } - - #[test] - fn bytes_buffer_overwrite() { - let wr = BytesWriter::new(); - wr.write([0u8, 1u8, 2u8, 3u8]); - assert!(*wr.bytes == ~[0u8, 1u8, 2u8, 3u8]); - wr.seek(-2, SeekCur); - wr.write([4u8, 5u8, 6u8, 7u8]); - assert!(*wr.bytes == ~[0u8, 1u8, 4u8, 5u8, 6u8, 7u8]); - wr.seek(-2, SeekEnd); - wr.write([8u8]); - wr.seek(1, SeekSet); - wr.write([9u8]); - assert!(*wr.bytes == ~[0u8, 9u8, 4u8, 5u8, 8u8, 7u8]); - } - - #[test] - fn test_read_write_le() { - let path = Path::new("tmp/lib-io-test-read-write-le.tmp"); - let uints = [0, 1, 2, 42, 10_123, 100_123_456, u64::max_value]; - - // write the ints to the file - { - let file = io::file_writer(&path, [io::Create]).unwrap(); - for i in uints.iter() { - file.write_le_u64(*i); - } - } - - // then read them back and check that they are the same - { - let file = io::file_reader(&path).unwrap(); - for i in uints.iter() { - assert_eq!(file.read_le_u64(), *i); - } - } - } - - #[test] - fn test_read_write_be() { - let path = Path::new("tmp/lib-io-test-read-write-be.tmp"); - let uints = [0, 1, 2, 42, 10_123, 100_123_456, u64::max_value]; - - // write the ints to the file - { - let file = io::file_writer(&path, [io::Create]).unwrap(); - for i in uints.iter() { - file.write_be_u64(*i); - } - } - - // then read them back and check that they are the same - { - let file = io::file_reader(&path).unwrap(); - for i in uints.iter() { - assert_eq!(file.read_be_u64(), *i); - } - } - } - - #[test] - fn test_read_be_int_n() { - let path = Path::new("tmp/lib-io-test-read-be-int-n.tmp"); - let ints = [i32::min_value, -123456, -42, -5, 0, 1, i32::max_value]; - - // write the ints to the file - { - let file = io::file_writer(&path, [io::Create]).unwrap(); - for i in ints.iter() { - file.write_be_i32(*i); - } - } - - // then read them back and check that they are the same - { - let file = io::file_reader(&path).unwrap(); - for i in ints.iter() { - // this tests that the sign extension is working - // (comparing the values as i32 would not test this) - assert_eq!(file.read_be_int_n(4), *i as i64); - } - } - } - - #[test] - fn test_read_f32() { - let path = Path::new("tmp/lib-io-test-read-f32.tmp"); - //big-endian floating-point 8.1250 - let buf = ~[0x41, 0x02, 0x00, 0x00]; - - { - let file = io::file_writer(&path, [io::Create]).unwrap(); - file.write(buf); - } - - { - let file = io::file_reader(&path).unwrap(); - let f = file.read_be_f32(); - assert_eq!(f, 8.1250); - } - } - - #[test] - fn test_read_write_f32() { - let path = Path::new("tmp/lib-io-test-read-write-f32.tmp"); - let f:f32 = 8.1250; - - { - let file = io::file_writer(&path, [io::Create]).unwrap(); - file.write_be_f32(f); - file.write_le_f32(f); - } - - { - let file = io::file_reader(&path).unwrap(); - assert_eq!(file.read_be_f32(), 8.1250); - assert_eq!(file.read_le_f32(), 8.1250); - } - } -} diff --git a/src/libstd/logging.rs b/src/libstd/logging.rs index 5e1ef3658b3..35a3ca3cff0 100644 --- a/src/libstd/logging.rs +++ b/src/libstd/logging.rs @@ -112,7 +112,7 @@ pub fn log(_level: u32, args: &fmt::Arguments) { } None => { // There is no logger anywhere, just write to stderr - let mut logger = StdErrLogger; + let mut logger = StdErrLogger::new(); logger.log(args); } } diff --git a/src/libstd/os.rs b/src/libstd/os.rs index ba2b42c9b9c..1f32c6a0a35 100644 --- a/src/libstd/os.rs +++ b/src/libstd/os.rs @@ -32,7 +32,6 @@ use c_str::CString; use clone::Clone; use container::Container; -use io; use iter::range; use libc; use libc::{c_char, c_void, c_int, size_t}; @@ -62,7 +61,7 @@ pub fn close(fd: c_int) -> c_int { // which are for Windows and for non-Windows, if necessary. // See https://github.com/mozilla/rust/issues/9822 for more information. -pub mod rustrt { +mod rustrt { use libc::{c_char, c_int}; use libc; @@ -190,6 +189,8 @@ pub fn env() -> ~[(~str,~str)] { #[cfg(windows)] unsafe fn get_env_pairs() -> ~[~str] { #[fixed_stack_segment]; #[inline(never)]; + use c_str; + use str::StrSlice; use libc::funcs::extra::kernel32::{ GetEnvironmentStringsA, @@ -200,7 +201,10 @@ pub fn env() -> ~[(~str,~str)] { fail!("os::env() failure getting env string from OS: {}", os::last_os_error()); } - let result = str::raw::from_c_multistring(ch as *libc::c_char, None); + let mut result = ~[]; + do c_str::from_c_multistring(ch as *libc::c_char, None) |cstr| { + result.push(cstr.as_str().unwrap().to_owned()); + }; FreeEnvironmentStringsA(ch); result } @@ -353,64 +357,6 @@ pub fn fdopen(fd: c_int) -> *FILE { } } - -// fsync related - -#[cfg(windows)] -pub fn fsync_fd(fd: c_int, _level: io::fsync::Level) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - unsafe { - use libc::funcs::extra::msvcrt::*; - return commit(fd); - } -} - -#[cfg(target_os = "linux")] -#[cfg(target_os = "android")] -pub fn fsync_fd(fd: c_int, level: io::fsync::Level) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - unsafe { - use libc::funcs::posix01::unistd::*; - match level { - io::fsync::FSync - | io::fsync::FullFSync => return fsync(fd), - io::fsync::FDataSync => return fdatasync(fd) - } - } -} - -#[cfg(target_os = "macos")] -pub fn fsync_fd(fd: c_int, level: io::fsync::Level) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - use libc::consts::os::extra::*; - use libc::funcs::posix88::fcntl::*; - use libc::funcs::posix01::unistd::*; - match level { - io::fsync::FSync => return fsync(fd), - _ => { - // According to man fnctl, the ok retval is only specified to be - // !=-1 - if (fcntl(F_FULLFSYNC as c_int, fd) == -1 as c_int) - { return -1 as c_int; } - else - { return 0 as c_int; } - } - } - } -} - -#[cfg(target_os = "freebsd")] -pub fn fsync_fd(fd: c_int, _l: io::fsync::Level) -> c_int { - #[fixed_stack_segment]; #[inline(never)]; - - unsafe { - use libc::funcs::posix01::unistd::*; - return fsync(fd); - } -} - pub struct Pipe { input: c_int, out: c_int diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index 886ad9995a1..bb8e6674b46 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -53,7 +53,6 @@ pub use container::{Container, Mutable, Map, MutableMap, Set, MutableSet}; pub use default::Default; pub use from_str::FromStr; pub use hash::Hash; -pub use io::{Reader, ReaderUtil, Writer, WriterUtil}; pub use iter::{FromIterator, Extendable}; pub use iter::{Iterator, DoubleEndedIterator, RandomAccessIterator, ClonableIterator}; pub use iter::{OrdIterator, MutableDoubleEndedIterator, ExactSize}; diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs index 99634b532b0..4b16f0bc0e1 100644 --- a/src/libstd/rt/io/extensions.rs +++ b/src/libstd/rt/io/extensions.rs @@ -18,11 +18,10 @@ use int; use iter::Iterator; use vec; use rt::io::{Reader, Writer, Decorator}; -use rt::io::{read_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE}; +use rt::io::{io_error, standard_error, EndOfFile, DEFAULT_BUF_SIZE}; use option::{Option, Some, None}; use unstable::finally::Finally; use cast; -use io::{u64_to_le_bytes, u64_to_be_bytes}; pub trait ReaderUtil { @@ -41,8 +40,8 @@ pub trait ReaderUtil { /// /// # Failure /// - /// Raises the same conditions as `read`. Additionally raises `read_error` - /// on EOF. If `read_error` is handled then `push_bytes` may push less + /// Raises the same conditions as `read`. Additionally raises `io_error` + /// on EOF. If `io_error` is handled then `push_bytes` may push less /// than the requested number of bytes. fn push_bytes(&mut self, buf: &mut ~[u8], len: uint); @@ -50,8 +49,8 @@ pub trait ReaderUtil { /// /// # Failure /// - /// Raises the same conditions as `read`. Additionally raises `read_error` - /// on EOF. If `read_error` is handled then the returned vector may + /// Raises the same conditions as `read`. Additionally raises `io_error` + /// on EOF. If `io_error` is handled then the returned vector may /// contain less than the requested number of bytes. fn read_bytes(&mut self, len: uint) -> ~[u8]; @@ -314,7 +313,7 @@ impl<T: Reader> ReaderUtil for T { total_read += nread; } None => { - read_error::cond.raise(standard_error(EndOfFile)); + io_error::cond.raise(standard_error(EndOfFile)); break; } } @@ -334,11 +333,11 @@ impl<T: Reader> ReaderUtil for T { fn read_to_end(&mut self) -> ~[u8] { let mut buf = vec::with_capacity(DEFAULT_BUF_SIZE); let mut keep_reading = true; - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if e.kind == EndOfFile { keep_reading = false; } else { - read_error::cond.raise(e) + io_error::cond.raise(e) } }).inside { while keep_reading { @@ -634,6 +633,88 @@ fn extend_sign(val: u64, nbytes: uint) -> i64 { (val << shift) as i64 >> shift } +pub fn u64_to_le_bytes<T>(n: u64, size: uint, + f: &fn(v: &[u8]) -> T) -> T { + assert!(size <= 8u); + match size { + 1u => f(&[n as u8]), + 2u => f(&[n as u8, + (n >> 8) as u8]), + 4u => f(&[n as u8, + (n >> 8) as u8, + (n >> 16) as u8, + (n >> 24) as u8]), + 8u => f(&[n as u8, + (n >> 8) as u8, + (n >> 16) as u8, + (n >> 24) as u8, + (n >> 32) as u8, + (n >> 40) as u8, + (n >> 48) as u8, + (n >> 56) as u8]), + _ => { + + let mut bytes: ~[u8] = ~[]; + let mut i = size; + let mut n = n; + while i > 0u { + bytes.push((n & 255_u64) as u8); + n >>= 8_u64; + i -= 1u; + } + f(bytes) + } + } +} + +pub fn u64_to_be_bytes<T>(n: u64, size: uint, + f: &fn(v: &[u8]) -> T) -> T { + assert!(size <= 8u); + match size { + 1u => f(&[n as u8]), + 2u => f(&[(n >> 8) as u8, + n as u8]), + 4u => f(&[(n >> 24) as u8, + (n >> 16) as u8, + (n >> 8) as u8, + n as u8]), + 8u => f(&[(n >> 56) as u8, + (n >> 48) as u8, + (n >> 40) as u8, + (n >> 32) as u8, + (n >> 24) as u8, + (n >> 16) as u8, + (n >> 8) as u8, + n as u8]), + _ => { + let mut bytes: ~[u8] = ~[]; + let mut i = size; + while i > 0u { + let shift = ((i - 1u) * 8u) as u64; + bytes.push((n >> shift) as u8); + i -= 1u; + } + f(bytes) + } + } +} + +pub fn u64_from_be_bytes(data: &[u8], + start: uint, + size: uint) + -> u64 { + let mut sz = size; + assert!((sz <= 8u)); + let mut val = 0_u64; + let mut pos = start; + while sz > 0u { + sz -= 1u; + val += (data[pos] as u64) << ((sz * 8u) as u64); + pos += 1u; + } + return val; +} + #[cfg(test)] mod test { use super::ReaderUtil; @@ -641,7 +722,7 @@ mod test { use cell::Cell; use rt::io::mem::{MemReader, MemWriter}; use rt::io::mock::MockReader; - use rt::io::{read_error, placeholder_error}; + use rt::io::{io_error, placeholder_error}; #[test] fn read_byte() { @@ -681,10 +762,10 @@ mod test { fn read_byte_error() { let mut reader = MockReader::new(); reader.read = |_| { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None }; - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { let byte = reader.read_byte(); assert!(byte == None); @@ -722,11 +803,11 @@ mod test { fn bytes_error() { let mut reader = MockReader::new(); reader.read = |_| { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None }; let mut it = reader.bytes(); - do read_error::cond.trap(|_| ()).inside { + do io_error::cond.trap(|_| ()).inside { let byte = it.next(); assert!(byte == None); } @@ -765,7 +846,7 @@ mod test { #[test] fn read_bytes_eof() { let mut reader = MemReader::new(~[10, 11]); - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { assert!(reader.read_bytes(4) == ~[10, 11]); } @@ -806,7 +887,7 @@ mod test { fn push_bytes_eof() { let mut reader = MemReader::new(~[10, 11]); let mut buf = ~[8, 9]; - do read_error::cond.trap(|_| { + do io_error::cond.trap(|_| { }).inside { reader.push_bytes(&mut buf, 4); assert!(buf == ~[8, 9, 10, 11]); @@ -824,13 +905,13 @@ mod test { buf[0] = 10; Some(1) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } }; let mut buf = ~[8, 9]; - do read_error::cond.trap(|_| { } ).inside { + do io_error::cond.trap(|_| { } ).inside { reader.push_bytes(&mut buf, 4); } assert!(buf == ~[8, 9, 10]); @@ -850,7 +931,7 @@ mod test { buf[0] = 10; Some(1) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } @@ -903,7 +984,7 @@ mod test { buf[1] = 11; Some(2) } else { - read_error::cond.raise(placeholder_error()); + io_error::cond.raise(placeholder_error()); None } } diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs index a43bcd8142e..d035e2f457c 100644 --- a/src/libstd/rt/io/file.rs +++ b/src/libstd/rt/io/file.rs @@ -15,10 +15,11 @@ with regular files & directories on a filesystem. At the top-level of the module are a set of freestanding functions, associated with various filesystem operations. They all operate -on a `PathLike` object. +on a `ToCStr` object. This trait is already defined for common +objects such as strings and `Path` instances. All operations in this module, including those as part of `FileStream` et al -block the task during execution. Most will raise `std::rt::io::{io_error,read_error}` +block the task during execution. Most will raise `std::rt::io::io_error` conditions in the event of failure. Also included in this module are the `FileInfo` and `DirectoryInfo` traits. When @@ -30,15 +31,14 @@ free function counterparts. */ use prelude::*; -use super::support::PathLike; +use c_str::ToCStr; use super::{Reader, Writer, Seek}; use super::{SeekStyle, Read, Write}; -use rt::rtio::{RtioFileStream, IoFactory, IoFactoryObject}; -use rt::io::{io_error, read_error, EndOfFile, +use rt::rtio::{RtioFileStream, IoFactory, with_local_io}; +use rt::io::{io_error, EndOfFile, FileMode, FileAccess, FileStat, IoError, PathAlreadyExists, PathDoesntExist, MismatchedFileTypeForOperation, ignore_io_error}; -use rt::local::Local; use option::{Some, None}; use path::Path; @@ -48,7 +48,6 @@ use path::Path; /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::open; /// use std::rt::io::{FileMode, FileAccess}; /// @@ -87,22 +86,20 @@ use path::Path; /// * Attempting to open a file with a `FileAccess` that the user lacks permissions /// for /// * Filesystem-level errors (full disk, etc) -pub fn open<P: PathLike>(path: &P, - mode: FileMode, - access: FileAccess - ) -> Option<FileStream> { - let open_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_open(path, mode, access) - }; - match open_result { - Ok(fd) => Some(FileStream { - fd: fd, - last_nread: -1 - }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn open<P: ToCStr>(path: &P, + mode: FileMode, + access: FileAccess + ) -> Option<FileStream> { + do with_local_io |io| { + match io.fs_open(&path.to_c_str(), mode, access) { + Ok(fd) => Some(FileStream { + fd: fd, + last_nread: -1 + }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -113,7 +110,6 @@ pub fn open<P: PathLike>(path: &P, /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::unlink; /// /// let p = &Path("/some/file/path.txt"); @@ -129,17 +125,16 @@ pub fn open<P: PathLike>(path: &P, /// /// This function will raise an `io_error` condition if the user lacks permissions to /// remove the file or if some other filesystem-level error occurs -pub fn unlink<P: PathLike>(path: &P) { - let unlink_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_unlink(path) - }; - match unlink_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn unlink<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_unlink(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Create a new, empty directory at the provided path @@ -148,7 +143,6 @@ pub fn unlink<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::mkdir; /// /// let p = &Path("/some/dir"); @@ -159,17 +153,16 @@ pub fn unlink<P: PathLike>(path: &P) { /// /// This call will raise an `io_error` condition if the user lacks permissions to make a /// new directory at the provided path, or if the directory already exists -pub fn mkdir<P: PathLike>(path: &P) { - let mkdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_mkdir(path) - }; - match mkdir_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn mkdir<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_mkdir(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Remove an existing, empty directory @@ -178,7 +171,6 @@ pub fn mkdir<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::rmdir; /// /// let p = &Path("/some/dir"); @@ -189,23 +181,22 @@ pub fn mkdir<P: PathLike>(path: &P) { /// /// This call will raise an `io_error` condition if the user lacks permissions to remove the /// directory at the provided path, or if the directory isn't empty -pub fn rmdir<P: PathLike>(path: &P) { - let rmdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_rmdir(path) - }; - match rmdir_result { - Ok(_) => (), - Err(ioerr) => { - io_error::cond.raise(ioerr); +pub fn rmdir<P: ToCStr>(path: &P) { + do with_local_io |io| { + match io.fs_rmdir(&path.to_c_str()) { + Ok(_) => Some(()), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } - } + }; } /// Get information on the file, directory, etc at the provided path /// -/// Given a `rt::io::support::PathLike`, query the file system to get -/// information about a file, directory, etc. +/// Given a path, query the file system to get information about a file, +/// directory, etc. /// /// Returns a `Some(std::rt::io::PathInfo)` on success /// @@ -213,7 +204,6 @@ pub fn rmdir<P: PathLike>(path: &P) { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::stat; /// /// let p = &Path("/some/file/path.txt"); @@ -238,18 +228,14 @@ pub fn rmdir<P: PathLike>(path: &P) { /// This call will raise an `io_error` condition if the user lacks the requisite /// permissions to perform a `stat` call on the given path or if there is no /// entry in the filesystem at the provided path. -pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { - let open_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_stat(path) - }; - match open_result { - Ok(p) => { - Some(p) - }, - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn stat<P: ToCStr>(path: &P) -> Option<FileStat> { + do with_local_io |io| { + match io.fs_stat(&path.to_c_str()) { + Ok(p) => Some(p), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -260,7 +246,6 @@ pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { /// /// use std; /// use std::path::Path; -/// use std::rt::io::support::PathLike; /// use std::rt::io::file::readdir; /// /// fn visit_dirs(dir: &Path, cb: &fn(&Path)) { @@ -279,18 +264,14 @@ pub fn stat<P: PathLike>(path: &P) -> Option<FileStat> { /// Will raise an `io_error` condition if the provided `path` doesn't exist, /// the process lacks permissions to view the contents or if the `path` points /// at a non-directory file -pub fn readdir<P: PathLike>(path: &P) -> Option<~[Path]> { - let readdir_result = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_readdir(path, 0) - }; - match readdir_result { - Ok(p) => { - Some(p) - }, - Err(ioerr) => { - io_error::cond.raise(ioerr); - None +pub fn readdir<P: ToCStr>(path: &P) -> Option<~[Path]> { + do with_local_io |io| { + match io.fs_readdir(&path.to_c_str(), 0) { + Ok(p) => Some(p), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -380,7 +361,7 @@ impl Reader for FileStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } @@ -407,7 +388,7 @@ impl Writer for FileStream { match self.fd.flush() { Ok(_) => (), Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } } } @@ -420,7 +401,7 @@ impl Seek for FileStream { match res { Ok(cursor) => cursor, Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); return -1; } } @@ -434,7 +415,7 @@ impl Seek for FileStream { () }, Err(ioerr) => { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } } } diff --git a/src/libstd/rt/io/mem.rs b/src/libstd/rt/io/mem.rs index 5f6b4398c22..0ec37cd3c07 100644 --- a/src/libstd/rt/io/mem.rs +++ b/src/libstd/rt/io/mem.rs @@ -22,46 +22,66 @@ use vec; /// Writes to an owned, growable byte vector pub struct MemWriter { - priv buf: ~[u8] + priv buf: ~[u8], + priv pos: uint, } impl MemWriter { - pub fn new() -> MemWriter { MemWriter { buf: vec::with_capacity(128) } } + pub fn new() -> MemWriter { + MemWriter { buf: vec::with_capacity(128), pos: 0 } + } } impl Writer for MemWriter { fn write(&mut self, buf: &[u8]) { - self.buf.push_all(buf) + // Make sure the internal buffer is as least as big as where we + // currently are + let difference = self.pos as i64 - self.buf.len() as i64; + if difference > 0 { + self.buf.grow(difference as uint, &0); + } + + // Figure out what bytes will be used to overwrite what's currently + // there (left), and what will be appended on the end (right) + let cap = self.buf.len() - self.pos; + let (left, right) = if cap <= buf.len() { + (buf.slice_to(cap), buf.slice_from(cap)) + } else { + (buf, &[]) + }; + + // Do the necessary writes + if left.len() > 0 { + vec::bytes::copy_memory(self.buf.mut_slice_from(self.pos), + left, left.len()); + } + if right.len() > 0 { + self.buf.push_all(right); + } + + // Bump us forward + self.pos += buf.len(); } fn flush(&mut self) { /* no-op */ } } impl Seek for MemWriter { - fn tell(&self) -> u64 { self.buf.len() as u64 } - - fn seek(&mut self, _pos: i64, _style: SeekStyle) { fail!() } -} - -impl Decorator<~[u8]> for MemWriter { - - fn inner(self) -> ~[u8] { - match self { - MemWriter { buf: buf } => buf - } - } + fn tell(&self) -> u64 { self.pos as u64 } - fn inner_ref<'a>(&'a self) -> &'a ~[u8] { - match *self { - MemWriter { buf: ref buf } => buf + fn seek(&mut self, pos: i64, style: SeekStyle) { + match style { + SeekSet => { self.pos = pos as uint; } + SeekEnd => { self.pos = self.buf.len() + pos as uint; } + SeekCur => { self.pos += pos as uint; } } } +} - fn inner_mut_ref<'a>(&'a mut self) -> &'a mut ~[u8] { - match *self { - MemWriter { buf: ref mut buf } => buf - } - } +impl Decorator<~[u8]> for MemWriter { + fn inner(self) -> ~[u8] { self.buf } + fn inner_ref<'a>(&'a self) -> &'a ~[u8] { &self.buf } + fn inner_mut_ref<'a>(&'a mut self) -> &'a mut ~[u8] { &mut self.buf } } /// Reads from an owned byte vector @@ -208,6 +228,7 @@ pub fn with_mem_writer(writeFn:&fn(&mut MemWriter)) -> ~[u8] { mod test { use prelude::*; use super::*; + use rt::io::*; #[test] fn test_mem_writer() { @@ -218,7 +239,24 @@ mod test { writer.write([1, 2, 3]); writer.write([4, 5, 6, 7]); assert_eq!(writer.tell(), 8); - assert_eq!(writer.inner(), ~[0, 1, 2, 3, 4, 5 , 6, 7]); + assert_eq!(*writer.inner_ref(), ~[0, 1, 2, 3, 4, 5, 6, 7]); + + writer.seek(0, SeekSet); + assert_eq!(writer.tell(), 0); + writer.write([3, 4]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 3, 4, 5, 6, 7]); + + writer.seek(1, SeekCur); + writer.write([0, 1]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 7]); + + writer.seek(-1, SeekEnd); + writer.write([1, 2]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 1, 2]); + + writer.seek(1, SeekEnd); + writer.write([1]); + assert_eq!(*writer.inner_ref(), ~[3, 4, 2, 0, 1, 5, 6, 1, 2, 0, 1]); } #[test] diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs index a80c1aab398..758c9779165 100644 --- a/src/libstd/rt/io/mod.rs +++ b/src/libstd/rt/io/mod.rs @@ -261,7 +261,6 @@ pub use self::net::tcp::TcpListener; pub use self::net::tcp::TcpStream; pub use self::net::udp::UdpStream; pub use self::pipe::PipeStream; -pub use self::pipe::UnboundPipeStream; pub use self::process::Process; // Some extension traits that all Readers and Writers get. @@ -299,10 +298,6 @@ pub mod comm_adapters; /// Extension traits pub mod extensions; -/// Non-I/O things needed by the I/O module -// XXX: shouldn this really be pub? -pub mod support; - /// Basic Timer pub mod timer; @@ -331,9 +326,11 @@ pub mod native { /// Mock implementations for testing mod mock; +/// Signal handling +pub mod signal; + /// The default buffer size for various I/O operations -/// XXX: Not pub -pub static DEFAULT_BUF_SIZE: uint = 1024 * 64; +static DEFAULT_BUF_SIZE: uint = 1024 * 64; /// The type passed to I/O condition handlers to indicate error /// @@ -375,7 +372,9 @@ pub enum IoErrorKind { BrokenPipe, PathAlreadyExists, PathDoesntExist, - MismatchedFileTypeForOperation + MismatchedFileTypeForOperation, + ResourceUnavailable, + IoUnavailable, } // FIXME: #8242 implementing manually because deriving doesn't work for some reason @@ -395,7 +394,9 @@ impl ToStr for IoErrorKind { BrokenPipe => ~"BrokenPipe", PathAlreadyExists => ~"PathAlreadyExists", PathDoesntExist => ~"PathDoesntExist", - MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation" + MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation", + IoUnavailable => ~"IoUnavailable", + ResourceUnavailable => ~"ResourceUnavailable", } } } @@ -406,12 +407,6 @@ condition! { pub io_error: IoError -> (); } -// XXX: Can't put doc comments on macros -// Raised by `read` on error -condition! { - pub read_error: IoError -> (); -} - /// Helper for wrapper calls where you want to /// ignore any io_errors that might be raised pub fn ignore_io_error<T>(cb: &fn() -> T) -> T { @@ -431,7 +426,7 @@ pub trait Reader { /// /// # Failure /// - /// Raises the `read_error` condition on error. If the condition + /// Raises the `io_error` condition on error. If the condition /// is handled then no guarantee is made about the number of bytes /// read and the contents of `buf`. If the condition is handled /// returns `None` (XXX see below). diff --git a/src/libstd/rt/io/native/file.rs b/src/libstd/rt/io/native/file.rs index d6820981181..ba819df071a 100644 --- a/src/libstd/rt/io/native/file.rs +++ b/src/libstd/rt/io/native/file.rs @@ -17,13 +17,31 @@ use os; use prelude::*; use super::super::*; -fn raise_error() { +#[cfg(windows)] +fn get_err(errno: i32) -> (IoErrorKind, &'static str) { + match errno { + libc::EOF => (EndOfFile, "end of file"), + _ => (OtherIoError, "unknown error"), + } +} + +#[cfg(not(windows))] +fn get_err(errno: i32) -> (IoErrorKind, &'static str) { // XXX: this should probably be a bit more descriptive... - let (kind, desc) = match os::errno() as i32 { + match errno { libc::EOF => (EndOfFile, "end of file"), + + // These two constants can have the same value on some systems, but + // different values on others, so we can't use a match clause + x if x == libc::EAGAIN || x == libc::EWOULDBLOCK => + (ResourceUnavailable, "resource temporarily unavailable"), + _ => (OtherIoError, "unknown error"), - }; + } +} +fn raise_error() { + let (kind, desc) = get_err(os::errno() as i32); io_error::cond.raise(IoError { kind: kind, desc: desc, diff --git a/src/libstd/rt/io/net/addrinfo.rs b/src/libstd/rt/io/net/addrinfo.rs new file mode 100644 index 00000000000..27cf9781c9c --- /dev/null +++ b/src/libstd/rt/io/net/addrinfo.rs @@ -0,0 +1,126 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Synchronous DNS Resolution + +Contains the functionality to perform DNS resolution in a style related to +getaddrinfo() + +*/ + +use option::{Option, Some, None}; +use result::{Ok, Err}; +use rt::io::{io_error}; +use rt::io::net::ip::{SocketAddr, IpAddr}; +use rt::rtio::{IoFactory, with_local_io}; + +/// Hints to the types of sockets that are desired when looking up hosts +pub enum SocketType { + Stream, Datagram, Raw +} + +/// Flags which can be or'd into the `flags` field of a `Hint`. These are used +/// to manipulate how a query is performed. +/// +/// The meaning of each of these flags can be found with `man -s 3 getaddrinfo` +pub enum Flag { + AddrConfig, + All, + CanonName, + NumericHost, + NumericServ, + Passive, + V4Mapped, +} + +/// A transport protocol associated with either a hint or a return value of +/// `lookup` +pub enum Protocol { + TCP, UDP +} + +/// This structure is used to provide hints when fetching addresses for a +/// remote host to control how the lookup is performed. +/// +/// For details on these fields, see their corresponding definitions via +/// `man -s 3 getaddrinfo` +pub struct Hint { + family: uint, + socktype: Option<SocketType>, + protocol: Option<Protocol>, + flags: uint, +} + +pub struct Info { + address: SocketAddr, + family: uint, + socktype: Option<SocketType>, + protocol: Option<Protocol>, + flags: uint, +} + +/// Easy name resolution. Given a hostname, returns the list of IP addresses for +/// that hostname. +/// +/// # Failure +/// +/// On failure, this will raise on the `io_error` condition. +pub fn get_host_addresses(host: &str) -> Option<~[IpAddr]> { + lookup(Some(host), None, None).map(|a| a.map(|i| i.address.ip)) +} + +/// Full-fleged resolution. This function will perform a synchronous call to +/// getaddrinfo, controlled by the parameters +/// +/// # Arguments +/// +/// * hostname - an optional hostname to lookup against +/// * servname - an optional service name, listed in the system services +/// * hint - see the hint structure, and "man -s 3 getaddrinfo", for how this +/// controls lookup +/// +/// # Failure +/// +/// On failure, this will raise on the `io_error` condition. +/// +/// XXX: this is not public because the `Hint` structure is not ready for public +/// consumption just yet. +fn lookup(hostname: Option<&str>, servname: Option<&str>, + hint: Option<Hint>) -> Option<~[Info]> { + do with_local_io |io| { + match io.get_host_addresses(hostname, servname, hint) { + Ok(i) => Some(i), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +#[cfg(test)] +mod test { + use option::Some; + use rt::io::net::ip::Ipv4Addr; + use super::*; + + #[test] + fn dns_smoke_test() { + let ipaddrs = get_host_addresses("localhost").unwrap(); + let mut found_local = false; + let local_addr = &Ipv4Addr(127, 0, 0, 1); + for addr in ipaddrs.iter() { + found_local = found_local || addr == local_addr; + } + assert!(found_local); + } +} diff --git a/src/libstd/rt/io/net/mod.rs b/src/libstd/rt/io/net/mod.rs index f44e879a63a..cf109167089 100644 --- a/src/libstd/rt/io/net/mod.rs +++ b/src/libstd/rt/io/net/mod.rs @@ -8,55 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::{Option, Some, None}; -use result::{Ok, Err}; -use rt::io::io_error; -use rt::io::net::ip::IpAddr; -use rt::rtio::{IoFactory, IoFactoryObject}; -use rt::local::Local; +pub use self::addrinfo::get_host_addresses; +pub mod addrinfo; pub mod tcp; pub mod udp; pub mod ip; #[cfg(unix)] pub mod unix; - -/// Simplistic name resolution -pub fn get_host_addresses(host: &str) -> Option<~[IpAddr]> { - /*! - * Get the IP addresses for a given host name. - * - * Raises io_error on failure. - */ - - let ipaddrs = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).get_host_addresses(host) - }; - - match ipaddrs { - Ok(i) => Some(i), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None - } - } -} - -#[cfg(test)] -mod test { - use option::Some; - use rt::io::net::ip::Ipv4Addr; - use super::*; - - #[test] - fn dns_smoke_test() { - let ipaddrs = get_host_addresses("localhost").unwrap(); - let mut found_local = false; - let local_addr = &Ipv4Addr(127, 0, 0, 1); - for addr in ipaddrs.iter() { - found_local = found_local || addr == local_addr; - } - assert!(found_local); - } -} diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs index f29e17cfc2f..4e841b36a5d 100644 --- a/src/libstd/rt/io/net/tcp.rs +++ b/src/libstd/rt/io/net/tcp.rs @@ -12,37 +12,27 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer, Listener, Acceptor}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioSocket, - RtioTcpListener, RtioTcpListenerObject, - RtioTcpAcceptor, RtioTcpAcceptorObject, - RtioTcpStream, RtioTcpStreamObject}; -use rt::local::Local; +use rt::io::{io_error, EndOfFile}; +use rt::rtio::{IoFactory, with_local_io, + RtioSocket, RtioTcpListener, RtioTcpAcceptor, RtioTcpStream}; pub struct TcpStream { - priv obj: ~RtioTcpStreamObject + priv obj: ~RtioTcpStream } impl TcpStream { - fn new(s: ~RtioTcpStreamObject) -> TcpStream { + fn new(s: ~RtioTcpStream) -> TcpStream { TcpStream { obj: s } } pub fn connect(addr: SocketAddr) -> Option<TcpStream> { - let stream = unsafe { - rtdebug!("borrowing io to connect"); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - rtdebug!("about to connect"); - (*io).tcp_connect(addr) - }; - - match stream { - Ok(s) => Some(TcpStream::new(s)), - Err(ioerr) => { - rtdebug!("failed to connect: {:?}", ioerr); - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.tcp_connect(addr) { + Ok(s) => Some(TcpStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -77,7 +67,7 @@ impl Reader for TcpStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } @@ -99,20 +89,18 @@ impl Writer for TcpStream { } pub struct TcpListener { - priv obj: ~RtioTcpListenerObject + priv obj: ~RtioTcpListener } impl TcpListener { pub fn bind(addr: SocketAddr) -> Option<TcpListener> { - let listener = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).tcp_bind(addr) - }; - match listener { - Ok(l) => Some(TcpListener { obj: l }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - return None; + do with_local_io |io| { + match io.tcp_bind(addr) { + Ok(l) => Some(TcpListener { obj: l }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -142,7 +130,7 @@ impl Listener<TcpStream, TcpAcceptor> for TcpListener { } pub struct TcpAcceptor { - priv obj: ~RtioTcpAcceptorObject + priv obj: ~RtioTcpAcceptor } impl Acceptor<TcpStream> for TcpAcceptor { @@ -320,7 +308,7 @@ mod test { let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if cfg!(windows) { assert_eq!(e.kind, NotConnected); } else { @@ -355,7 +343,7 @@ mod test { let mut buf = [0]; let nread = stream.read(buf); assert!(nread.is_none()); - do read_error::cond.trap(|e| { + do io_error::cond.trap(|e| { if cfg!(windows) { assert_eq!(e.kind, NotConnected); } else { diff --git a/src/libstd/rt/io/net/udp.rs b/src/libstd/rt/io/net/udp.rs index 27faae0838b..2e4ae95d98e 100644 --- a/src/libstd/rt/io/net/udp.rs +++ b/src/libstd/rt/io/net/udp.rs @@ -12,25 +12,22 @@ use option::{Option, Some, None}; use result::{Ok, Err}; use rt::io::net::ip::SocketAddr; use rt::io::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::rtio::{RtioSocket, RtioUdpSocketObject, RtioUdpSocket, IoFactory, IoFactoryObject}; -use rt::local::Local; +use rt::io::{io_error, EndOfFile}; +use rt::rtio::{RtioSocket, RtioUdpSocket, IoFactory, with_local_io}; pub struct UdpSocket { - priv obj: ~RtioUdpSocketObject + priv obj: ~RtioUdpSocket } impl UdpSocket { pub fn bind(addr: SocketAddr) -> Option<UdpSocket> { - let socket = unsafe { - let factory: *mut IoFactoryObject = Local::unsafe_borrow(); - (*factory).udp_bind(addr) - }; - match socket { - Ok(s) => Some(UdpSocket { obj: s }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.udp_bind(addr) { + Ok(s) => Some(UdpSocket { obj: s }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } @@ -41,7 +38,7 @@ impl UdpSocket { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } None } diff --git a/src/libstd/rt/io/net/unix.rs b/src/libstd/rt/io/net/unix.rs index 1771a963ba7..e424956e2ff 100644 --- a/src/libstd/rt/io/net/unix.rs +++ b/src/libstd/rt/io/net/unix.rs @@ -8,44 +8,289 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +/*! + +Named pipes + +This module contains the ability to communicate over named pipes with +synchronous I/O. On windows, this corresponds to talking over a Named Pipe, +while on Unix it corresponds to UNIX domain sockets. + +These pipes are similar to TCP in the sense that you can have both a stream to a +server and a server itself. The server provided accepts other `UnixStream` +instances as clients. + +*/ + use prelude::*; -use super::super::*; -use super::super::support::PathLike; -pub struct UnixStream; +use c_str::ToCStr; +use rt::rtio::{IoFactory, RtioUnixListener, with_local_io}; +use rt::rtio::{RtioUnixAcceptor, RtioPipe}; +use rt::io::pipe::PipeStream; +use rt::io::{io_error, Listener, Acceptor, Reader, Writer}; + +/// A stream which communicates over a named pipe. +pub struct UnixStream { + priv obj: PipeStream, +} impl UnixStream { - pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> { - fail!() + fn new(obj: ~RtioPipe) -> UnixStream { + UnixStream { obj: PipeStream::new(obj) } + } + + /// Connect to a pipe named by `path`. This will attempt to open a + /// connection to the underlying socket. + /// + /// The returned stream will be closed when the object falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the connection + /// could not be made. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixStream; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixStream::connect(&server); + /// stream.write([1, 2, 3]); + /// + pub fn connect<P: ToCStr>(path: &P) -> Option<UnixStream> { + do with_local_io |io| { + match io.unix_connect(&path.to_c_str()) { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } } impl Reader for UnixStream { - fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() } - - fn eof(&mut self) -> bool { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) } + fn eof(&mut self) -> bool { self.obj.eof() } } impl Writer for UnixStream { - fn write(&mut self, _v: &[u8]) { fail!() } - - fn flush(&mut self) { fail!() } + fn write(&mut self, buf: &[u8]) { self.obj.write(buf) } + fn flush(&mut self) { self.obj.flush() } } -pub struct UnixListener; +pub struct UnixListener { + priv obj: ~RtioUnixListener, +} impl UnixListener { - pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> { - fail!() + + /// Creates a new listener, ready to receive incoming connections on the + /// specified socket. The server will be named by `path`. + /// + /// This listener will be closed when it falls out of scope. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if the specified + /// path could not be bound. + /// + /// # Example + /// + /// use std::rt::io::net::unix::UnixListener; + /// + /// let server = Path("path/to/my/socket"); + /// let mut stream = UnixListener::bind(&server); + /// for client in stream.incoming() { + /// let mut client = client; + /// client.write([1, 2, 3, 4]); + /// } + /// + pub fn bind<P: ToCStr>(path: &P) -> Option<UnixListener> { + do with_local_io |io| { + match io.unix_bind(&path.to_c_str()) { + Ok(s) => Some(UnixListener{ obj: s }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } } impl Listener<UnixStream, UnixAcceptor> for UnixListener { - fn listen(self) -> Option<UnixAcceptor> { fail!() } + fn listen(self) -> Option<UnixAcceptor> { + match self.obj.listen() { + Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } } -pub struct UnixAcceptor; +pub struct UnixAcceptor { + priv obj: ~RtioUnixAcceptor, +} impl Acceptor<UnixStream> for UnixAcceptor { - fn accept(&mut self) -> Option<UnixStream> { fail!() } + fn accept(&mut self) -> Option<UnixStream> { + match self.obj.accept() { + Ok(s) => Some(UnixStream::new(s)), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use prelude::*; + use super::*; + use cell::Cell; + use rt::test::*; + use rt::io::*; + use rt::comm::oneshot; + use os; + + fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) { + let server = Cell::new(server); + let client = Cell::new(client); + do run_in_mt_newsched_task { + let server = Cell::new(server.take()); + let client = Cell::new(client.take()); + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + server.take()(acceptor.accept().unwrap()); + } + + do spawntask { + port.take().recv(); + client.take()(UnixStream::connect(&path2).unwrap()); + } + } + } + + #[test] + fn bind_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == PermissionDenied); + called = true; + }).inside { + let listener = UnixListener::bind(&("path/to/nowhere")); + assert!(listener.is_none()); + } + assert!(called); + } + } + + #[test] + fn connect_error() { + do run_in_mt_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert_eq!(e.kind, OtherIoError); + called = true; + }).inside { + let stream = UnixStream::connect(&("path/to/nowhere")); + assert!(stream.is_none()); + } + assert!(called); + } + } + + #[test] + fn smoke() { + smalltest(|mut server| { + let mut buf = [0]; + server.read(buf); + assert!(buf[0] == 99); + }, |mut client| { + client.write([99]); + }) + } + + #[test] + fn read_eof() { + smalltest(|mut server| { + let mut buf = [0]; + assert!(server.read(buf).is_none()); + assert!(server.read(buf).is_none()); + }, |_client| { + // drop the client + }) + } + + #[test] + fn write_begone() { + smalltest(|mut server| { + let buf = [0]; + let mut stop = false; + while !stop{ + do io_error::cond.trap(|e| { + assert_eq!(e.kind, BrokenPipe); + stop = true; + }).inside { + server.write(buf); + } + } + }, |_client| { + // drop the client + }) + } + + #[test] + fn accept_lots() { + do run_in_mt_newsched_task { + let times = 10; + let path1 = next_test_unix(); + let path2 = path1.clone(); + let (port, chan) = oneshot(); + let port = Cell::new(port); + let chan = Cell::new(chan); + + do spawntask { + let mut acceptor = UnixListener::bind(&path1).listen(); + chan.take().send(()); + do times.times { + let mut client = acceptor.accept(); + let mut buf = [0]; + client.read(buf); + assert_eq!(buf[0], 100); + } + } + + do spawntask { + port.take().recv(); + do times.times { + let mut stream = UnixStream::connect(&path2); + stream.write([100]); + } + } + } + } + + #[test] + fn path_exists() { + do run_in_mt_newsched_task { + let path = next_test_unix(); + let _acceptor = UnixListener::bind(&path).listen(); + assert!(os::path_exists(&path)); + } + } } diff --git a/src/libstd/rt/io/option.rs b/src/libstd/rt/io/option.rs index 2ea1b615483..52699964b62 100644 --- a/src/libstd/rt/io/option.rs +++ b/src/libstd/rt/io/option.rs @@ -13,12 +13,10 @@ //! I/O constructors return option types to allow errors to be handled. //! These implementations allow e.g. `Option<FileStream>` to be used //! as a `Reader` without unwrapping the option first. -//! -//! # XXX Seek and Close use option::*; -use super::{Reader, Writer, Listener, Acceptor}; -use super::{standard_error, PreviousIoError, io_error, read_error, IoError}; +use super::{Reader, Writer, Listener, Acceptor, Seek, SeekStyle}; +use super::{standard_error, PreviousIoError, io_error, IoError}; fn prev_io_error() -> IoError { standard_error(PreviousIoError) @@ -45,7 +43,7 @@ impl<R: Reader> Reader for Option<R> { match *self { Some(ref mut reader) => reader.read(buf), None => { - read_error::cond.raise(prev_io_error()); + io_error::cond.raise(prev_io_error()); None } } @@ -62,6 +60,24 @@ impl<R: Reader> Reader for Option<R> { } } +impl<S: Seek> Seek for Option<S> { + fn tell(&self) -> u64 { + match *self { + Some(ref seeker) => seeker.tell(), + None => { + io_error::cond.raise(prev_io_error()); + 0 + } + } + } + fn seek(&mut self, pos: i64, style: SeekStyle) { + match *self { + Some(ref mut seeker) => seeker.seek(pos, style), + None => io_error::cond.raise(prev_io_error()) + } + } +} + impl<T, A: Acceptor<T>, L: Listener<T, A>> Listener<T, A> for Option<L> { fn listen(self) -> Option<A> { match self { @@ -91,7 +107,7 @@ mod test { use option::*; use super::super::mem::*; use rt::test::*; - use super::super::{PreviousIoError, io_error, read_error}; + use super::super::{PreviousIoError, io_error, io_error}; #[test] fn test_option_writer() { @@ -145,7 +161,7 @@ mod test { let mut buf = []; let mut called = false; - do read_error::cond.trap(|err| { + do io_error::cond.trap(|err| { assert_eq!(err.kind, PreviousIoError); called = true; }).inside { diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs index d2cd531ed26..ec9a4a0101f 100644 --- a/src/libstd/rt/io/pipe.rs +++ b/src/libstd/rt/io/pipe.rs @@ -15,37 +15,47 @@ use prelude::*; use super::{Reader, Writer}; -use rt::io::{io_error, read_error, EndOfFile}; -use rt::local::Local; -use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory}; -use rt::rtio::RtioUnboundPipeObject; +use rt::io::{io_error, EndOfFile}; +use rt::io::native::file; +use rt::rtio::{RtioPipe, with_local_io}; pub struct PipeStream { - priv obj: RtioPipeObject + priv obj: ~RtioPipe, } -// This should not be a newtype, but rt::uv::process::set_stdio needs to reach -// into the internals of this :( -pub struct UnboundPipeStream(~RtioUnboundPipeObject); - impl PipeStream { - /// Creates a new pipe initialized, but not bound to any particular - /// source/destination - pub fn new() -> Option<UnboundPipeStream> { - let pipe = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).pipe_init(false) - }; - match pipe { - Ok(p) => Some(UnboundPipeStream(p)), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + /// Consumes a file descriptor to return a pipe stream that will have + /// synchronous, but non-blocking reads/writes. This is useful if the file + /// descriptor is acquired via means other than the standard methods. + /// + /// This operation consumes ownership of the file descriptor and it will be + /// closed once the object is deallocated. + /// + /// # Example + /// + /// use std::libc; + /// use std::rt::io::pipe; + /// + /// let mut pipe = PipeStream::open(libc::STDERR_FILENO); + /// pipe.write(bytes!("Hello, stderr!")); + /// + /// # Failure + /// + /// If the pipe cannot be created, an error will be raised on the + /// `io_error` condition. + pub fn open(fd: file::fd_t) -> Option<PipeStream> { + do with_local_io |io| { + match io.pipe_open(fd) { + Ok(obj) => Some(PipeStream { obj: obj }), + Err(e) => { + io_error::cond.raise(e); + None + } } } } - pub fn bind(inner: RtioPipeObject) -> PipeStream { + pub fn new(inner: ~RtioPipe) -> PipeStream { PipeStream { obj: inner } } } @@ -57,14 +67,14 @@ impl Reader for PipeStream { Err(ioerr) => { // EOF is indicated by returning None if ioerr.kind != EndOfFile { - read_error::cond.raise(ioerr); + io_error::cond.raise(ioerr); } return None; } } } - fn eof(&mut self) -> bool { fail!() } + fn eof(&mut self) -> bool { false } } impl Writer for PipeStream { @@ -77,5 +87,5 @@ impl Writer for PipeStream { } } - fn flush(&mut self) { fail!() } + fn flush(&mut self) {} } diff --git a/src/libstd/rt/io/process.rs b/src/libstd/rt/io/process.rs index 5f2453852ee..ae087099d1f 100644 --- a/src/libstd/rt/io/process.rs +++ b/src/libstd/rt/io/process.rs @@ -11,12 +11,12 @@ //! Bindings for executing child processes use prelude::*; +use cell::Cell; use libc; use rt::io; use rt::io::io_error; -use rt::local::Local; -use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; +use rt::rtio::{RtioProcess, IoFactory, with_local_io}; // windows values don't matter as long as they're at least one of unix's // TERM/KILL/INT signals @@ -26,7 +26,7 @@ use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory}; #[cfg(not(windows))] pub static MustDieSignal: int = libc::SIGKILL as int; pub struct Process { - priv handle: ~RtioProcessObject, + priv handle: ~RtioProcess, io: ~[Option<io::PipeStream>], } @@ -57,7 +57,7 @@ pub struct ProcessConfig<'self> { /// 0 - stdin /// 1 - stdout /// 2 - stderr - io: ~[StdioContainer] + io: &'self [StdioContainer] } /// Describes what to do with a standard io stream for a child process. @@ -70,42 +70,32 @@ pub enum StdioContainer { /// specified for. InheritFd(libc::c_int), - // XXX: these two shouldn't have libuv-specific implementation details - - /// The specified libuv stream is inherited for the corresponding file - /// descriptor it is assigned to. - // XXX: this needs to be thought out more. - //InheritStream(uv::net::StreamWatcher), - - /// Creates a pipe for the specified file descriptor which will be directed - /// into the previously-initialized pipe passed in. + /// Creates a pipe for the specified file descriptor which will be created + /// when the process is spawned. /// /// The first boolean argument is whether the pipe is readable, and the /// second is whether it is writable. These properties are from the view of /// the *child* process, not the parent process. - CreatePipe(io::UnboundPipeStream, - bool /* readable */, - bool /* writable */), + CreatePipe(bool /* readable */, bool /* writable */), } impl Process { /// Creates a new pipe initialized, but not bound to any particular /// source/destination pub fn new(config: ProcessConfig) -> Option<Process> { - let process = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).spawn(config) - }; - match process { - Ok((p, io)) => Some(Process{ - handle: p, - io: io.move_iter().map(|p| - p.map(|p| io::PipeStream::bind(p)) - ).collect() - }), - Err(ioerr) => { - io_error::cond.raise(ioerr); - None + let config = Cell::new(config); + do with_local_io |io| { + match io.spawn(config.take()) { + Ok((p, io)) => Some(Process{ + handle: p, + io: io.move_iter().map(|p| + p.map(|p| io::PipeStream::new(p)) + ).collect() + }), + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } } } } diff --git a/src/libstd/rt/io/signal.rs b/src/libstd/rt/io/signal.rs new file mode 100644 index 00000000000..a13fc19d000 --- /dev/null +++ b/src/libstd/rt/io/signal.rs @@ -0,0 +1,220 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + +Signal handling + +This modules provides bindings to receive signals safely, built on top of the +local I/O factory. There are a number of defined signals which can be caught, +but not all signals will work across all platforms (windows doesn't have +definitions for a number of signals. + +*/ + +use comm::{Port, SharedChan, stream}; +use hashmap; +use option::{Some, None}; +use result::{Err, Ok}; +use rt::io::io_error; +use rt::rtio::{IoFactory, RtioSignal, with_local_io}; + +#[deriving(Eq, IterBytes)] +pub enum Signum { + /// Equivalent to SIGBREAK, delivered when the user presses Ctrl-Break. + Break = 21i, + /// Equivalent to SIGHUP, delivered when the user closes the terminal + /// window. On delivery of HangUp, the program is given approximately + /// 10 seconds to perfom any cleanup. After that, Windows will + /// unconditionally terminate it. + HangUp = 1i, + /// Equivalent to SIGINT, delivered when the user presses Ctrl-c. + Interrupt = 2i, + /// Equivalent to SIGQUIT, delivered when the user presses Ctrl-\. + Quit = 3i, + /// Equivalent to SIGTSTP, delivered when the user presses Ctrl-z. + StopTemporarily = 20i, + /// Equivalent to SIGUSR1. + User1 = 10i, + /// Equivalent to SIGUSR2. + User2 = 12i, + /// Equivalent to SIGWINCH, delivered when the console has been resized. + /// WindowSizeChange may not be delivered in a timely manner; size change + /// will only be detected when the cursor is being moved. + WindowSizeChange = 28i, +} + +/// Listener provides a port to listen for registered signals. +/// +/// Listener automatically unregisters its handles once it is out of scope. +/// However, clients can still unregister signums manually. +/// +/// # Example +/// +/// ```rust +/// use std::rt::io::signal::{Listener, Interrupt}; +/// +/// let mut listener = Listener::new(); +/// listener.register(signal::Interrupt); +/// +/// do spawn { +/// loop { +/// match listener.port.recv() { +/// Interrupt => println("Got Interrupt'ed"), +/// _ => (), +/// } +/// } +/// } +/// +/// ``` +pub struct Listener { + /// A map from signums to handles to keep the handles in memory + priv handles: hashmap::HashMap<Signum, ~RtioSignal>, + /// chan is where all the handles send signums, which are received by + /// the clients from port. + priv chan: SharedChan<Signum>, + + /// Clients of Listener can `recv()` from this port. This is exposed to + /// allow selection over this port as well as manipulation of the port + /// directly. + port: Port<Signum>, +} + +impl Listener { + /// Creates a new listener for signals. Once created, signals are bound via + /// the `register` method (otherwise nothing will ever be received) + pub fn new() -> Listener { + let (port, chan) = stream(); + Listener { + chan: SharedChan::new(chan), + port: port, + handles: hashmap::HashMap::new(), + } + } + + /// Listen for a signal, returning true when successfully registered for + /// signum. Signals can be received using `recv()`. + /// + /// Once a signal is registered, this listener will continue to receive + /// notifications of signals until it is unregistered. This occurs + /// regardless of the number of other listeners registered in other tasks + /// (or on this task). + /// + /// Signals are still received if there is no task actively waiting for + /// a signal, and a later call to `recv` will return the signal that was + /// received while no task was waiting on it. + /// + /// # Failure + /// + /// If this function fails to register a signal handler, then an error will + /// be raised on the `io_error` condition and the function will return + /// false. + pub fn register(&mut self, signum: Signum) -> bool { + if self.handles.contains_key(&signum) { + return true; // self is already listening to signum, so succeed + } + do with_local_io |io| { + match io.signal(signum, self.chan.clone()) { + Ok(w) => { + self.handles.insert(signum, w); + Some(()) + }, + Err(ioerr) => { + io_error::cond.raise(ioerr); + None + } + } + }.is_some() + } + + /// Unregisters a signal. If this listener currently had a handler + /// registered for the signal, then it will stop receiving any more + /// notification about the signal. If the signal has already been received, + /// it may still be returned by `recv`. + pub fn unregister(&mut self, signum: Signum) { + self.handles.pop(&signum); + } +} + +#[cfg(test)] +mod test { + use libc; + use rt::io::timer; + use rt::io; + use super::*; + + // kill is only available on Unixes + #[cfg(unix)] + #[fixed_stack_segment] + fn sigint() { + unsafe { + libc::funcs::posix88::signal::kill(libc::getpid(), libc::SIGINT); + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_smoketest() { + let mut signal = Listener::new(); + signal.register(Interrupt); + sigint(); + timer::sleep(10); + match signal.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_two_signal_one_signum() { + let mut s1 = Listener::new(); + let mut s2 = Listener::new(); + s1.register(Interrupt); + s2.register(Interrupt); + sigint(); + timer::sleep(10); + match s1.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + match s1.port.recv() { + Interrupt => (), + s => fail!("Expected Interrupt, got {:?}", s), + } + } + + #[test] #[cfg(unix)] + fn test_io_signal_unregister() { + let mut s1 = Listener::new(); + let mut s2 = Listener::new(); + s1.register(Interrupt); + s2.register(Interrupt); + s2.unregister(Interrupt); + sigint(); + timer::sleep(10); + if s2.port.peek() { + fail!("Unexpected {:?}", s2.port.recv()); + } + } + + #[cfg(windows)] + #[test] + fn test_io_signal_invalid_signum() { + let mut s = Listener::new(); + let mut called = false; + do io::io_error::cond.trap(|_| { + called = true; + }).inside { + if s.register(User1) { + fail!("Unexpected successful registry of signum {:?}", User1); + } + } + assert!(called); + } +} diff --git a/src/libstd/rt/io/stdio.rs b/src/libstd/rt/io/stdio.rs index e6dd9a48099..b922e6400cc 100644 --- a/src/libstd/rt/io/stdio.rs +++ b/src/libstd/rt/io/stdio.rs @@ -8,23 +8,90 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +/*! + +This modules provides bindings to the local event loop's TTY interface, using it +to have synchronous, but non-blocking versions of stdio. These handles can be +inspected for information about terminal dimensions or related information +about the stream or terminal that it is attached to. + +# Example + +```rust +use std::rt::io; + +let mut out = io::stdout(); +out.write(bytes!("Hello, world!")); +``` + +*/ + use fmt; use libc; use option::{Option, Some, None}; use result::{Ok, Err}; -use rt::local::Local; -use rt::rtio::{RtioFileStream, IoFactoryObject, IoFactory}; -use super::{Reader, Writer, io_error}; +use rt::rtio::{IoFactory, RtioTTY, RtioFileStream, with_local_io, + CloseAsynchronously}; +use super::{Reader, Writer, io_error, IoError, OtherIoError}; + +// And so begins the tale of acquiring a uv handle to a stdio stream on all +// platforms in all situations. Our story begins by splitting the world into two +// categories, windows and unix. Then one day the creators of unix said let +// there be redirection! And henceforth there was redirection away from the +// console for standard I/O streams. +// +// After this day, the world split into four factions: +// +// 1. Unix with stdout on a terminal. +// 2. Unix with stdout redirected. +// 3. Windows with stdout on a terminal. +// 4. Windows with stdout redirected. +// +// Many years passed, and then one day the nation of libuv decided to unify this +// world. After months of toiling, uv created three ideas: TTY, Pipe, File. +// These three ideas propagated throughout the lands and the four great factions +// decided to settle among them. +// +// The groups of 1, 2, and 3 all worked very hard towards the idea of TTY. Upon +// doing so, they even enhanced themselves further then their Pipe/File +// brethren, becoming the dominant powers. +// +// The group of 4, however, decided to work independently. They abandoned the +// common TTY belief throughout, and even abandoned the fledgling Pipe belief. +// The members of the 4th faction decided to only align themselves with File. +// +// tl;dr; TTY works on everything but when windows stdout is redirected, in that +// case pipe also doesn't work, but magically file does! +enum StdSource { + TTY(~RtioTTY), + File(~RtioFileStream), +} + +#[fixed_stack_segment] #[inline(never)] +fn src<T>(fd: libc::c_int, readable: bool, f: &fn(StdSource) -> T) -> T { + do with_local_io |io| { + let fd = unsafe { libc::dup(fd) }; + match io.tty_open(fd, readable) { + Ok(tty) => Some(f(TTY(tty))), + Err(_) => { + // It's not really that desirable if these handles are closed + // synchronously, and because they're squirreled away in a task + // structure the destructors will be run when the task is + // attempted to get destroyed. This means that if we run a + // synchronous destructor we'll attempt to do some scheduling + // operations which will just result in sadness. + Some(f(File(io.fs_from_raw_fd(fd, CloseAsynchronously)))) + } + } + }.unwrap() +} /// Creates a new non-blocking handle to the stdin of the current process. /// /// See `stdout()` for notes about this function. +#[fixed_stack_segment] #[inline(never)] pub fn stdin() -> StdReader { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDIN_FILENO, false) - }; - StdReader { inner: stream } + do src(libc::STDIN_FILENO, true) |src| { StdReader { inner: src } } } /// Creates a new non-blocking handle to the stdout of the current process. @@ -34,22 +101,14 @@ pub fn stdin() -> StdReader { /// task context because the stream returned will be a non-blocking object using /// the local scheduler to perform the I/O. pub fn stdout() -> StdWriter { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDOUT_FILENO, false) - }; - StdWriter { inner: stream } + do src(libc::STDOUT_FILENO, false) |src| { StdWriter { inner: src } } } /// Creates a new non-blocking handle to the stderr of the current process. /// /// See `stdout()` for notes about this function. pub fn stderr() -> StdWriter { - let stream = unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - (*io).fs_from_raw_fd(libc::STDERR_FILENO, false) - }; - StdWriter { inner: stream } + do src(libc::STDERR_FILENO, false) |src| { StdWriter { inner: src } } } /// Prints a string to the stdout of the current process. No newline is emitted @@ -87,12 +146,16 @@ pub fn println_args(fmt: &fmt::Arguments) { /// Representation of a reader of a standard input stream pub struct StdReader { - priv inner: ~RtioFileStream + priv inner: StdSource } impl Reader for StdReader { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { - match self.inner.read(buf) { + let ret = match self.inner { + TTY(ref mut tty) => tty.read(buf), + File(ref mut file) => file.read(buf).map_move(|i| i as uint), + }; + match ret { Ok(amt) => Some(amt as uint), Err(e) => { io_error::cond.raise(e); @@ -106,21 +169,102 @@ impl Reader for StdReader { /// Representation of a writer to a standard output stream pub struct StdWriter { - priv inner: ~RtioFileStream + priv inner: StdSource +} + +impl StdWriter { + /// Gets the size of this output window, if possible. This is typically used + /// when the writer is attached to something like a terminal, this is used + /// to fetch the dimensions of the terminal. + /// + /// If successful, returns Some((width, height)). + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if an error + /// happens. + pub fn winsize(&mut self) -> Option<(int, int)> { + match self.inner { + TTY(ref mut tty) => { + match tty.get_winsize() { + Ok(p) => Some(p), + Err(e) => { + io_error::cond.raise(e); + None + } + } + } + File(*) => { + io_error::cond.raise(IoError { + kind: OtherIoError, + desc: "stream is not a tty", + detail: None, + }); + None + } + } + } + + /// Controls whether this output stream is a "raw stream" or simply a normal + /// stream. + /// + /// # Failure + /// + /// This function will raise on the `io_error` condition if an error + /// happens. + pub fn set_raw(&mut self, raw: bool) { + match self.inner { + TTY(ref mut tty) => { + match tty.set_raw(raw) { + Ok(()) => {}, + Err(e) => io_error::cond.raise(e), + } + } + File(*) => { + io_error::cond.raise(IoError { + kind: OtherIoError, + desc: "stream is not a tty", + detail: None, + }); + } + } + } + + /// Returns whether this tream is attached to a TTY instance or not. + /// + /// This is similar to libc's isatty() function + pub fn isatty(&self) -> bool { + match self.inner { + TTY(ref tty) => tty.isatty(), + File(*) => false, + } + } } impl Writer for StdWriter { fn write(&mut self, buf: &[u8]) { - match self.inner.write(buf) { + let ret = match self.inner { + TTY(ref mut tty) => tty.write(buf), + File(ref mut file) => file.write(buf), + }; + match ret { Ok(()) => {} Err(e) => io_error::cond.raise(e) } } - fn flush(&mut self) { - match self.inner.flush() { - Ok(()) => {} - Err(e) => io_error::cond.raise(e) - } + fn flush(&mut self) { /* nothing to do */ } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn smoke() { + // Just make sure we can acquire handles + stdin(); + stdout(); + stderr(); } } diff --git a/src/libstd/rt/io/support.rs b/src/libstd/rt/io/support.rs deleted file mode 100644 index 31040bc51a1..00000000000 --- a/src/libstd/rt/io/support.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use path::*; - -pub trait PathLike { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T; -} - -impl<'self> PathLike for &'self str { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T { - f(*self) - } -} - -impl PathLike for Path { - fn path_as_str<T>(&self, f: &fn(&str) -> T) -> T { - let s = self.as_str().unwrap(); - f(s) - } -} - -#[cfg(test)] -mod test { - use path::*; - use super::PathLike; - - #[test] - fn path_like_smoke_test() { - let expected = if cfg!(unix) { "/home" } else { "C:\\" }; - let path = Path::new(expected); - path.path_as_str(|p| assert!(p == expected)); - path.path_as_str(|p| assert!(p == expected)); - } -} diff --git a/src/libstd/rt/io/timer.rs b/src/libstd/rt/io/timer.rs index b41d7541a60..fab0062ee00 100644 --- a/src/libstd/rt/io/timer.rs +++ b/src/libstd/rt/io/timer.rs @@ -10,13 +10,11 @@ use option::{Option, Some, None}; use result::{Ok, Err}; -use rt::io::{io_error}; -use rt::rtio::{IoFactory, IoFactoryObject, - RtioTimer, RtioTimerObject}; -use rt::local::Local; +use rt::io::io_error; +use rt::rtio::{IoFactory, RtioTimer, with_local_io}; pub struct Timer { - priv obj: ~RtioTimerObject + priv obj: ~RtioTimer } /// Sleep the current task for `msecs` milliseconds. @@ -28,20 +26,19 @@ pub fn sleep(msecs: u64) { impl Timer { + /// Creates a new timer which can be used to put the current task to sleep + /// for a number of milliseconds. pub fn new() -> Option<Timer> { - let timer = unsafe { - rtdebug!("Timer::init: borrowing io to init timer"); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - rtdebug!("about to init timer"); - (*io).timer_init() - }; - match timer { - Ok(t) => Some(Timer { obj: t }), - Err(ioerr) => { - rtdebug!("Timer::init: failed to init: {:?}", ioerr); - io_error::cond.raise(ioerr); - None + do with_local_io |io| { + match io.timer_init() { + Ok(t) => Some(Timer { obj: t }), + Err(ioerr) => { + rtdebug!("Timer::init: failed to init: {:?}", ioerr); + io_error::cond.raise(ioerr); + None + } } + } } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index d4f31879c00..1ddc2f86f4b 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -12,8 +12,6 @@ use option::{Option, Some, None}; use rt::sched::Scheduler; use rt::task::Task; use rt::local_ptr; -use rt::rtio::{EventLoop, IoFactoryObject}; -//use borrow::to_uint; use cell::Cell; pub trait Local { @@ -122,24 +120,6 @@ impl Local for Scheduler { } } -// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer -impl Local for IoFactoryObject { - fn put(_value: ~IoFactoryObject) { rtabort!("unimpl") } - fn take() -> ~IoFactoryObject { rtabort!("unimpl") } - fn exists(_: Option<IoFactoryObject>) -> bool { rtabort!("unimpl") } - fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { rtabort!("unimpl") } - unsafe fn unsafe_take() -> ~IoFactoryObject { rtabort!("unimpl") } - unsafe fn unsafe_borrow() -> *mut IoFactoryObject { - let sched: *mut Scheduler = Local::unsafe_borrow(); - let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); - return io; - } - unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { - rtabort!("unimpl") - } -} - - #[cfg(test)] mod test { use option::None; diff --git a/src/libstd/rt/logging.rs b/src/libstd/rt/logging.rs index 660d1cd4359..31650ede700 100644 --- a/src/libstd/rt/logging.rs +++ b/src/libstd/rt/logging.rs @@ -12,6 +12,7 @@ use fmt; use from_str::from_str; use libc::exit; use option::{Some, None, Option}; +use rt::io; use rt::crate_map::{ModEntry, CrateMap, iter_crate_map, get_crate_map}; use str::StrSlice; use u32; @@ -166,14 +167,23 @@ pub trait Logger { fn log(&mut self, args: &fmt::Arguments); } -pub struct StdErrLogger; +/// This logger emits output to the stderr of the process, and contains a lazily +/// initialized event-loop driven handle to the stream. +pub struct StdErrLogger { + priv handle: Option<io::stdio::StdWriter>, +} + +impl StdErrLogger { + pub fn new() -> StdErrLogger { StdErrLogger { handle: None } } +} impl Logger for StdErrLogger { fn log(&mut self, args: &fmt::Arguments) { - // FIXME(#6846): this should not call the blocking version of println, - // or at least the default loggers for tasks shouldn't do - // that - ::rt::util::dumb_println(args); + // First time logging? Get a handle to the stderr of this process. + if self.handle.is_none() { + self.handle = Some(io::stderr()); + } + fmt::writeln(self.handle.get_mut_ref() as &mut io::Writer, args); } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 9ea7b734d24..66d7a6bf488 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -279,7 +279,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { rtdebug!("inserting a regular scheduler"); // Every scheduler is driven by an I/O event loop. - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~rtio::EventLoop; let mut sched = ~Scheduler::new(loop_, work_queue.clone(), work_queues.clone(), @@ -303,7 +303,7 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { // set. let work_queue = WorkQueue::new(); - let main_loop = ~UvEventLoop::new(); + let main_loop = ~UvEventLoop::new() as ~rtio::EventLoop; let mut main_sched = ~Scheduler::new_special(main_loop, work_queue, work_queues.clone(), diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 501def8b060..66a0676a2f4 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -11,40 +11,29 @@ use libc; use option::*; use result::*; +use comm::SharedChan; use libc::c_int; +use c_str::CString; +use ai = rt::io::net::addrinfo; use rt::io::IoError; +use rt::io::signal::Signum; use super::io::process::ProcessConfig; use super::io::net::ip::{IpAddr, SocketAddr}; -use rt::uv::uvio; use path::Path; -use super::io::support::PathLike; use super::io::{SeekStyle}; use super::io::{FileMode, FileAccess, FileStat}; -// XXX: ~object doesn't work currently so these are some placeholder -// types to use instead -pub type EventLoopObject = uvio::UvEventLoop; -pub type RemoteCallbackObject = uvio::UvRemoteCallback; -pub type IoFactoryObject = uvio::UvIoFactory; -pub type RtioTcpStreamObject = uvio::UvTcpStream; -pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor; -pub type RtioTcpListenerObject = uvio::UvTcpListener; -pub type RtioUdpSocketObject = uvio::UvUdpSocket; -pub type RtioTimerObject = uvio::UvTimer; -pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback; -pub type RtioPipeObject = uvio::UvPipeStream; -pub type RtioUnboundPipeObject = uvio::UvUnboundPipe; -pub type RtioProcessObject = uvio::UvProcess; - pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback; fn callback_ms(&mut self, ms: u64, ~fn()); - fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback; + /// The asynchronous I/O services. Not all event loops may provide one - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; + // FIXME(#9382) this is an awful interface + fn io<'a>(&'a mut self, f: &fn(&'a mut IoFactory)); } pub trait RemoteCallback { @@ -69,32 +58,74 @@ pub struct FileOpenConfig { priv mode: int } +/// Description of what to do when a file handle is closed +pub enum CloseBehavior { + /// Do not close this handle when the object is destroyed + DontClose, + /// Synchronously close the handle, meaning that the task will block when + /// the handle is destroyed until it has been fully closed. + CloseSynchronously, + /// Asynchronously closes a handle, meaning that the task will *not* block + /// when the handle is destroyed, but the handle will still get deallocated + /// and cleaned up (but this will happen asynchronously on the local event + /// loop). + CloseAsynchronously, +} + +pub fn with_local_io<T>(f: &fn(&mut IoFactory) -> Option<T>) -> Option<T> { + use rt::sched::Scheduler; + use rt::local::Local; + use rt::io::{io_error, standard_error, IoUnavailable}; + + unsafe { + let sched: *mut Scheduler = Local::unsafe_borrow(); + let mut io = None; + (*sched).event_loop.io(|i| io = Some(i)); + match io { + Some(io) => f(io), + None => { + io_error::cond.raise(standard_error(IoUnavailable)); + None + } + } + } +} + pub trait IoFactory { - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError>; - fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError>; - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError>; - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError>; - fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream; - fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess) + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError>; + fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError>; + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError>; + fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, + hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError>; + fn timer_init(&mut self) -> Result<~RtioTimer, IoError>; + fn fs_from_raw_fd(&mut self, fd: c_int, close: CloseBehavior) -> ~RtioFileStream; + fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError>; - fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError>; - fn fs_stat<P: PathLike>(&mut self, path: &P) -> Result<FileStat, IoError>; - fn fs_mkdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>; - fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) -> + fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError>; + fn fs_mkdir(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError>; + fn fs_readdir(&mut self, path: &CString, flags: c_int) -> Result<~[Path], IoError>; - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>; fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>; + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>; + + fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>; + fn unix_bind(&mut self, path: &CString) -> + Result<~RtioUnixListener, IoError>; + fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>; + fn tty_open(&mut self, fd: c_int, readable: bool) + -> Result<~RtioTTY, IoError>; + fn signal(&mut self, signal: Signum, channel: SharedChan<Signum>) + -> Result<~RtioSignal, IoError>; } pub trait RtioTcpListener : RtioSocket { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>; + fn listen(~self) -> Result<~RtioTcpAcceptor, IoError>; } pub trait RtioTcpAcceptor : RtioSocket { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>; + fn accept(&mut self) -> Result<~RtioTcpStream, IoError>; fn accept_simultaneously(&mut self) -> Result<(), IoError>; fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; } @@ -154,3 +185,30 @@ pub trait RtioPipe { fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; } + +pub trait RtioUnixListener { + fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; +} + +pub trait RtioUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipe, IoError>; + fn accept_simultaneously(&mut self) -> Result<(), IoError>; + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>; +} + +pub trait RtioTTY { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>; + fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; + fn set_raw(&mut self, raw: bool) -> Result<(), IoError>; + fn get_winsize(&mut self) -> Result<(int, int), IoError>; + fn isatty(&self) -> bool; +} + +pub trait PausibleIdleCallback { + fn start(&mut self, f: ~fn()); + fn pause(&mut self); + fn resume(&mut self); + fn close(&mut self); +} + +pub trait RtioSignal {} diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index ee163bab3c0..6e661884616 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -16,7 +16,7 @@ use unstable::raw; use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool}; -use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; +use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; use super::message_queue::MessageQueue; @@ -63,7 +63,7 @@ pub struct Scheduler { no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoopObject, + event_loop: ~EventLoop, /// The scheduler runs on a special task. When it is not running /// it is stored here instead of the work queue. priv sched_task: Option<~Task>, @@ -107,7 +107,7 @@ impl Scheduler { // * Initialization Functions - pub fn new(event_loop: ~EventLoopObject, + pub fn new(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList) @@ -119,7 +119,7 @@ impl Scheduler { } - pub fn new_special(event_loop: ~EventLoopObject, + pub fn new_special(event_loop: ~EventLoop, work_queue: WorkQueue<~Task>, work_queues: ~[WorkQueue<~Task>], sleeper_list: SleeperList, @@ -227,7 +227,7 @@ impl Scheduler { // mutable reference to the event_loop to give it the "run" // command. unsafe { - let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; + let event_loop: *mut ~EventLoop = &mut self_sched.event_loop; // Our scheduler must be in the task before the event loop // is started. @@ -793,7 +793,7 @@ pub enum SchedMessage { } pub struct SchedHandle { - priv remote: ~RemoteCallbackObject, + priv remote: ~RemoteCallback, priv queue: MessageQueue<SchedMessage>, sched_id: uint } @@ -905,6 +905,7 @@ mod test { use cell::Cell; use rt::thread::Thread; use rt::task::{Task, Sched}; + use rt::rtio::EventLoop; use rt::util; use option::{Some}; @@ -1020,7 +1021,7 @@ mod test { // Our normal scheduler let mut normal_sched = ~Scheduler::new( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, normal_queue, queues.clone(), sleepers.clone()); @@ -1031,7 +1032,7 @@ mod test { // Our special scheduler let mut special_sched = ~Scheduler::new_special( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, special_queue.clone(), queues.clone(), sleepers.clone(), @@ -1202,7 +1203,7 @@ mod test { let queues = ~[queue.clone()]; let mut sched = ~Scheduler::new( - ~UvEventLoop::new(), + ~UvEventLoop::new() as ~EventLoop, queue, queues.clone(), sleepers.clone()); diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index c4f352501a0..1ea68bb52d7 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -132,7 +132,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, death: Death::new(), @@ -166,7 +166,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, death: Death::new(), @@ -188,7 +188,7 @@ impl Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: StdErrLogger, + logger: StdErrLogger::new(), unwinder: Unwinder { unwinding: false }, taskgroup: None, // FIXME(#7544) make watching optional @@ -479,7 +479,6 @@ pub extern "C" fn rust_stack_exhausted() { use rt::in_green_task_context; use rt::task::Task; use rt::local::Local; - use rt::logging::Logger; use unstable::intrinsics; unsafe { @@ -529,8 +528,12 @@ pub extern "C" fn rust_stack_exhausted() { do Local::borrow |task: &mut Task| { let n = task.name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); - format_args!(|args| { task.logger.log(args) }, - "task '{}' has overflowed its stack", n); + // See the message below for why this is not emitted to the + // task's logger. This has the additional conundrum of the + // logger may not be initialized just yet, meaning that an FFI + // call would happen to initialized it (calling out to libuv), + // and the FFI call needs 2MB of stack when we just ran out. + rterrln!("task '{}' has overflowed its stack", n); } } else { rterrln!("stack overflow in non-task context"); @@ -546,9 +549,9 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! { use rt::in_green_task_context; use rt::task::Task; use rt::local::Local; - use rt::logging::Logger; use str::Str; use c_str::CString; + use unstable::intrinsics; unsafe { let msg = CString::new(msg, false); @@ -557,35 +560,35 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! { Some(s) => s, None => rtabort!("message wasn't utf8?") }; - if in_green_task_context() { - // Be careful not to allocate in this block, if we're failing we may - // have been failing due to a lack of memory in the first place... - do Local::borrow |task: &mut Task| { - let n = task.name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); - - match file.as_str() { - Some(file) => { - format_args!(|args| { task.logger.log(args) }, - "task '{}' failed at '{}', {}:{}", - n, msg, file, line); - } - None => { - format_args!(|args| { task.logger.log(args) }, - "task '{}' failed at '{}'", n, msg); - } - } - } - } else { + if !in_green_task_context() { match file.as_str() { Some(file) => { rterrln!("failed in non-task context at '{}', {}:{}", msg, file, line as int); } - None => rterrln!("failed in non-task context at '{}'", msg), + None => rterrln!("failed in non-task context at '{}'", msg) } + intrinsics::abort(); } + // Be careful not to allocate in this block, if we're failing we may + // have been failing due to a lack of memory in the first place... let task: *mut Task = Local::unsafe_borrow(); + let n = (*task).name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); + + // XXX: this should no get forcibly printed to the console, this should + // either be sent to the parent task (ideally), or get printed to + // the task's logger. Right now the logger is actually a uvio + // instance, which uses unkillable blocks internally for various + // reasons. This will cause serious trouble if the task is failing + // due to mismanagment of its own kill flag, so calling our own + // logger in its current state is a bit of a problem. + match file.as_str() { + Some(file) => { + rterrln!("task '{}' failed at '{}', {}:{}", n, msg, file, line); + } + None => rterrln!("task '{}' failed at '{}'", n, msg), + } if (*task).unwinder.unwinding { rtabort!("unwinding again"); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 4f7ebb4a721..c238b1dfba1 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use rand; +use rand::Rng; +use os; use libc; use option::{Some, None}; +use path::Path; use cell::Cell; use clone::Clone; use container::Container; @@ -18,6 +22,7 @@ use super::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec::{OwnedVector, MutableVector, ImmutableVector}; use path::GenericPath; use rt::sched::Scheduler; +use rt::rtio::EventLoop; use unstable::{run_in_bare_thread}; use rt::thread::Thread; use rt::task::Task; @@ -32,7 +37,7 @@ pub fn new_test_uv_sched() -> Scheduler { let queue = WorkQueue::new(); let queues = ~[queue.clone()]; - let mut sched = Scheduler::new(~UvEventLoop::new(), + let mut sched = Scheduler::new(~UvEventLoop::new() as ~EventLoop, queue, queues, SleeperList::new()); @@ -191,7 +196,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } for i in range(0u, nthreads) { - let loop_ = ~UvEventLoop::new(); + let loop_ = ~UvEventLoop::new() as ~EventLoop; let mut sched = ~Scheduler::new(loop_, work_queues[i].clone(), work_queues.clone(), @@ -327,6 +332,12 @@ pub fn next_test_port() -> u16 { } } +/// Get a temporary path which could be the location of a unix socket +#[fixed_stack_segment] #[inline(never)] +pub fn next_test_unix() -> Path { + os::tmpdir().join(rand::task_rng().gen_ascii_str(20)) +} + /// Get a unique IPv4 localhost:port pair starting at 9600 pub fn next_test_ip4() -> SocketAddr { SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() } diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index 647d88c26f2..070985fb0a5 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -71,9 +71,24 @@ pub fn default_sched_threads() -> uint { pub fn dumb_println(args: &fmt::Arguments) { use rt::io::native::stdio::stderr; - use rt::io::Writer; + use rt::io::{Writer, io_error, ResourceUnavailable}; + use rt::task::Task; + use rt::local::Local; + let mut out = stderr(); - fmt::writeln(&mut out as &mut Writer, args); + if Local::exists(None::<Task>) { + let mut again = true; + do io_error::cond.trap(|e| { + again = e.kind == ResourceUnavailable; + }).inside { + while again { + again = false; + fmt::writeln(&mut out as &mut Writer, args); + } + } + } else { + fmt::writeln(&mut out as &mut Writer, args); + } } pub fn abort(msg: &str) -> ! { diff --git a/src/libstd/rt/uv/addrinfo.rs b/src/libstd/rt/uv/addrinfo.rs index f2abcd3aca7..a1593d5c8db 100644 --- a/src/libstd/rt/uv/addrinfo.rs +++ b/src/libstd/rt/uv/addrinfo.rs @@ -18,9 +18,10 @@ use rt::uv::uvll; use rt::uv::uvll::UV_GETADDRINFO; use rt::uv::{Loop, UvError, NativeHandle}; use rt::uv::status_to_maybe_uv_error; -use rt::uv::net::UvAddrInfo; +use rt::uv::net; +use ai = rt::io::net::addrinfo; -type GetAddrInfoCallback = ~fn(GetAddrInfoRequest, &UvAddrInfo, Option<UvError>); +type GetAddrInfoCallback = ~fn(GetAddrInfoRequest, &net::UvAddrInfo, Option<UvError>); pub struct GetAddrInfoRequest(*uvll::uv_getaddrinfo_t); @@ -38,7 +39,7 @@ impl GetAddrInfoRequest { } pub fn getaddrinfo(&mut self, loop_: &Loop, node: Option<&str>, - service: Option<&str>, hints: Option<UvAddrInfo>, + service: Option<&str>, hints: Option<ai::Hint>, cb: GetAddrInfoCallback) { assert!(node.is_some() || service.is_some()); @@ -72,8 +73,41 @@ impl GetAddrInfoRequest { cb(req, addrinfo, err) }; - // XXX: Implement hints - assert!(hints.is_none()); + let hint = hints.map(|hint| { + let mut flags = 0; + do each_ai_flag |cval, aival| { + if hint.flags & (aival as uint) != 0 { + flags |= cval as i32; + } + } + /* XXX: do we really want to support these? + let socktype = match hint.socktype { + Some(ai::Stream) => uvll::rust_SOCK_STREAM(), + Some(ai::Datagram) => uvll::rust_SOCK_DGRAM(), + Some(ai::Raw) => uvll::rust_SOCK_RAW(), + None => 0, + }; + let protocol = match hint.protocol { + Some(ai::UDP) => uvll::rust_IPPROTO_UDP(), + Some(ai::TCP) => uvll::rust_IPPROTO_TCP(), + _ => 0, + }; + */ + let socktype = 0; + let protocol = 0; + + uvll::addrinfo { + ai_flags: flags, + ai_family: hint.family as c_int, + ai_socktype: socktype, + ai_protocol: protocol, + ai_addrlen: 0, + ai_canonname: null(), + ai_addr: null(), + ai_next: null(), + } + }); + let hint_ptr = hint.as_ref().map_default(null(), |x| x as *uvll::addrinfo); self.get_req_data().getaddrinfo_cb = Some(wrapper_cb); @@ -83,7 +117,7 @@ impl GetAddrInfoRequest { getaddrinfo_cb, c_node_ptr, c_service_ptr, - null())); + hint_ptr)); } extern "C" fn getaddrinfo_cb(req: *uvll::uv_getaddrinfo_t, @@ -91,7 +125,7 @@ impl GetAddrInfoRequest { res: *uvll::addrinfo) { let mut req: GetAddrInfoRequest = NativeHandle::from_native_handle(req); let err = status_to_maybe_uv_error(status); - let addrinfo = UvAddrInfo(res); + let addrinfo = net::UvAddrInfo(res); let data = req.get_req_data(); (*data.getaddrinfo_cb.get_ref())(req, &addrinfo, err); unsafe { @@ -137,6 +171,72 @@ impl GetAddrInfoRequest { } } +fn each_ai_flag(_f: &fn(c_int, ai::Flag)) { + /* XXX: do we really want to support these? + unsafe { + f(uvll::rust_AI_ADDRCONFIG(), ai::AddrConfig); + f(uvll::rust_AI_ALL(), ai::All); + f(uvll::rust_AI_CANONNAME(), ai::CanonName); + f(uvll::rust_AI_NUMERICHOST(), ai::NumericHost); + f(uvll::rust_AI_NUMERICSERV(), ai::NumericServ); + f(uvll::rust_AI_PASSIVE(), ai::Passive); + f(uvll::rust_AI_V4MAPPED(), ai::V4Mapped); + } + */ +} + +// Traverse the addrinfo linked list, producing a vector of Rust socket addresses +pub fn accum_addrinfo(addr: &net::UvAddrInfo) -> ~[ai::Info] { + unsafe { + let &net::UvAddrInfo(addr) = addr; + let mut addr = addr; + + let mut addrs = ~[]; + loop { + let uvaddr = net::sockaddr_to_UvSocketAddr((*addr).ai_addr); + let rustaddr = net::uv_socket_addr_to_socket_addr(uvaddr); + + let mut flags = 0; + do each_ai_flag |cval, aival| { + if (*addr).ai_flags & cval != 0 { + flags |= aival as uint; + } + } + + /* XXX: do we really want to support these + let protocol = match (*addr).ai_protocol { + p if p == uvll::rust_IPPROTO_UDP() => Some(ai::UDP), + p if p == uvll::rust_IPPROTO_TCP() => Some(ai::TCP), + _ => None, + }; + let socktype = match (*addr).ai_socktype { + p if p == uvll::rust_SOCK_STREAM() => Some(ai::Stream), + p if p == uvll::rust_SOCK_DGRAM() => Some(ai::Datagram), + p if p == uvll::rust_SOCK_RAW() => Some(ai::Raw), + _ => None, + }; + */ + let protocol = None; + let socktype = None; + + addrs.push(ai::Info { + address: rustaddr, + family: (*addr).ai_family as uint, + socktype: socktype, + protocol: protocol, + flags: flags, + }); + if (*addr).ai_next.is_not_null() { + addr = (*addr).ai_next; + } else { + break; + } + } + + return addrs; + } +} + impl NativeHandle<*uvll::uv_getaddrinfo_t> for GetAddrInfoRequest { fn from_native_handle(handle: *uvll::uv_getaddrinfo_t) -> GetAddrInfoRequest { GetAddrInfoRequest(handle) @@ -150,7 +250,6 @@ impl NativeHandle<*uvll::uv_getaddrinfo_t> for GetAddrInfoRequest { mod test { use option::{Some, None}; use rt::uv::Loop; - use rt::uv::net::accum_sockaddrs; use rt::io::net::ip::{SocketAddr, Ipv4Addr}; use super::*; @@ -159,14 +258,14 @@ mod test { let mut loop_ = Loop::new(); let mut req = GetAddrInfoRequest::new(); do req.getaddrinfo(&loop_, Some("localhost"), None, None) |_, addrinfo, _| { - let sockaddrs = accum_sockaddrs(addrinfo); + let sockaddrs = accum_addrinfo(addrinfo); let mut found_local = false; let local_addr = &SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 0 }; for addr in sockaddrs.iter() { - found_local = found_local || addr == local_addr; + found_local = found_local || addr.address == *local_addr; } assert!(found_local); } diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index ff7bb9dd03a..108aef43c3c 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -8,11 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::{c_int, c_void}; +use libc::c_int; use option::Some; use rt::uv::uvll; use rt::uv::uvll::UV_ASYNC; -use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback}; use rt::uv::WatcherInterop; use rt::uv::status_to_maybe_uv_error; @@ -47,27 +47,6 @@ impl AsyncWatcher { uvll::async_send(handle); } } - - pub fn close(self, cb: NullCallback) { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - - unsafe { - uvll::close(self.native_handle(), close_cb); - } - - extern fn close_cb(handle: *uvll::uv_stream_t) { - let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); - { - let data = watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - watcher.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *c_void); } - } - } } impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs index 5d64ca4d755..d2ca15959b0 100644 --- a/src/libstd/rt/uv/file.rs +++ b/src/libstd/rt/uv/file.rs @@ -10,12 +10,13 @@ use prelude::*; use ptr::null; +use c_str; +use c_str::CString; use libc::c_void; use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, status_to_maybe_uv_error, UvError}; use rt::uv::uvll; use rt::uv::uvll::*; -use super::super::io::support::PathLike; use cast::transmute; use libc; use libc::{c_int}; @@ -36,74 +37,67 @@ impl FsRequest { fs_req } - pub fn open<P: PathLike>(self, loop_: &Loop, path: &P, flags: int, mode: int, - cb: FsCallback) { + pub fn open(self, loop_: &Loop, path: &CString, flags: int, mode: int, + cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_open(loop_.native_handle(), self.native_handle(), p, flags, mode, complete_cb_ptr) - }) }); + assert_eq!(ret, 0); } - pub fn open_sync<P: PathLike>(self, loop_: &Loop, path: &P, - flags: int, mode: int) -> Result<c_int, UvError> { + pub fn open_sync(self, loop_: &Loop, path: &CString, + flags: int, mode: int) -> Result<c_int, UvError> { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(None) }; - let result = path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let result = path.with_ref(|p| unsafe { uvll::fs_open(loop_.native_handle(), self.native_handle(), p, flags, mode, complete_cb_ptr) - }) }); self.sync_cleanup(result) } - pub fn unlink<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn unlink(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let ret = path.with_ref(|p| unsafe { + uvll::fs_unlink(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn unlink_sync<P: PathLike>(self, loop_: &Loop, path: &P) + pub fn unlink_sync(self, loop_: &Loop, path: &CString) -> Result<c_int, UvError> { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(None) }; - let result = path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_unlink(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let result = path.with_ref(|p| unsafe { + uvll::fs_unlink(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); self.sync_cleanup(result) } - pub fn stat<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn stat(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { - uvll::fs_stat(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + let ret = path.with_ref(|p| unsafe { + uvll::fs_stat(loop_.native_handle(), + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } pub fn write(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64, cb: FsCallback) { @@ -113,11 +107,12 @@ impl FsRequest { }; let base_ptr = buf.base as *c_void; let len = buf.len as uint; - unsafe { + let ret = unsafe { uvll::fs_write(loop_.native_handle(), self.native_handle(), fd, base_ptr, len, offset, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn write_sync(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) -> Result<c_int, UvError> { @@ -142,11 +137,12 @@ impl FsRequest { }; let buf_ptr = buf.base as *c_void; let len = buf.len as uint; - unsafe { + let ret = unsafe { uvll::fs_read(loop_.native_handle(), self.native_handle(), fd, buf_ptr, len, offset, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn read_sync(self, loop_: &Loop, fd: c_int, buf: Buf, offset: i64) -> Result<c_int, UvError> { @@ -169,10 +165,11 @@ impl FsRequest { let mut me = self; me.req_boilerplate(Some(cb)) }; - unsafe { + let ret = unsafe { uvll::fs_close(loop_.native_handle(), self.native_handle(), fd, complete_cb_ptr) }; + assert_eq!(ret, 0); } pub fn close_sync(self, loop_: &Loop, fd: c_int) -> Result<c_int, UvError> { let complete_cb_ptr = { @@ -186,44 +183,41 @@ impl FsRequest { self.sync_cleanup(result) } - pub fn mkdir<P: PathLike>(self, loop_: &Loop, path: &P, mode: int, cb: FsCallback) { + pub fn mkdir(self, loop_: &Loop, path: &CString, mode: int, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_mkdir(loop_.native_handle(), - self.native_handle(), p, mode, complete_cb_ptr) - }) + self.native_handle(), p, mode, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn rmdir<P: PathLike>(self, loop_: &Loop, path: &P, cb: FsCallback) { + pub fn rmdir(self, loop_: &Loop, path: &CString, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_rmdir(loop_.native_handle(), - self.native_handle(), p, complete_cb_ptr) - }) + self.native_handle(), p, complete_cb_ptr) }); + assert_eq!(ret, 0); } - pub fn readdir<P: PathLike>(self, loop_: &Loop, path: &P, - flags: c_int, cb: FsCallback) { + pub fn readdir(self, loop_: &Loop, path: &CString, + flags: c_int, cb: FsCallback) { let complete_cb_ptr = { let mut me = self; me.req_boilerplate(Some(cb)) }; - path.path_as_str(|p| { - p.with_c_str(|p| unsafe { + let ret = path.with_ref(|p| unsafe { uvll::fs_readdir(loop_.native_handle(), - self.native_handle(), p, flags, complete_cb_ptr) - }) + self.native_handle(), p, flags, complete_cb_ptr) }); + assert_eq!(ret, 0); } // accessors/utility funcs @@ -286,13 +280,10 @@ impl FsRequest { } } - pub fn get_paths(&mut self) -> ~[~str] { - use str; + pub fn each_path(&mut self, f: &fn(&CString)) { let ptr = self.get_ptr(); match self.get_result() { - n if (n <= 0) => { - ~[] - }, + n if (n <= 0) => {} n => { let n_len = n as uint; // we pass in the len that uv tells us is there @@ -301,11 +292,10 @@ impl FsRequest { // correctly delimited and we stray into garbage memory? // in any case, passing Some(n_len) fixes it and ensures // good results - let raw_path_strs = unsafe { - str::raw::from_c_multistring(ptr as *libc::c_char, Some(n_len)) }; - let raw_len = raw_path_strs.len(); - assert_eq!(raw_len, n_len); - raw_path_strs + unsafe { + c_str::from_c_multistring(ptr as *libc::c_char, + Some(n_len), f); + } } } } @@ -368,7 +358,6 @@ mod test { use vec; use str; use unstable::run_in_bare_thread; - use path::Path; use rt::uv::{Loop, Buf, slice_to_uv_buf}; use libc::{O_CREAT, O_RDWR, O_RDONLY, S_IWUSR, S_IRUSR}; @@ -391,10 +380,9 @@ mod test { let read_mem = vec::from_elem(read_buf_len, 0u8); let read_buf = slice_to_uv_buf(read_mem); let read_buf_ptr: *Buf = &read_buf; - let p = Path::new(path_str); let open_req = FsRequest::new(); - do open_req.open(&loop_, &p, create_flags as int, mode as int) - |req, uverr| { + do open_req.open(&loop_, &path_str.to_c_str(), create_flags as int, + mode as int) |req, uverr| { assert!(uverr.is_none()); let fd = req.get_result(); let buf = unsafe { *write_buf_ptr }; @@ -405,8 +393,8 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let open_req = FsRequest::new(); - do open_req.open(&loop_, &Path::new(path_str), read_flags as int,0) - |req, uverr| { + do open_req.open(&loop_, &path_str.to_c_str(), + read_flags as int,0) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let fd = req.get_result(); @@ -431,7 +419,8 @@ mod test { assert!(uverr.is_none()); let loop_ = &req.get_loop(); let unlink_req = FsRequest::new(); - do unlink_req.unlink(loop_, &Path::new(path_str)) + do unlink_req.unlink(loop_, + &path_str.to_c_str()) |_,uverr| { assert!(uverr.is_none()); }; @@ -465,8 +454,8 @@ mod test { let write_buf = slice_to_uv_buf(write_val); // open/create let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &Path::new(path_str), - create_flags as int, mode as int); + let result = open_req.open_sync(&loop_, &path_str.to_c_str(), + create_flags as int, mode as int); assert!(result.is_ok()); let fd = result.unwrap(); // write @@ -479,7 +468,7 @@ mod test { assert!(result.is_ok()); // re-open let open_req = FsRequest::new(); - let result = open_req.open_sync(&loop_, &Path::new(path_str), + let result = open_req.open_sync(&loop_, &path_str.to_c_str(), read_flags as int,0); assert!(result.is_ok()); let len = 1028; @@ -503,7 +492,7 @@ mod test { assert!(result.is_ok()); // unlink let unlink_req = FsRequest::new(); - let result = unlink_req.unlink_sync(&loop_, &Path::new(path_str)); + let result = unlink_req.unlink_sync(&loop_, &path_str.to_c_str()); assert!(result.is_ok()); } else { fail!("nread was 0.. wudn't expectin' that."); } loop_.close(); @@ -539,8 +528,8 @@ mod test { let write_buf = slice_to_uv_buf(write_val); let write_buf_ptr: *Buf = &write_buf; let open_req = FsRequest::new(); - do open_req.open(&loop_, &path, create_flags as int, mode as int) - |req, uverr| { + do open_req.open(&loop_, &path.to_c_str(), create_flags as int, + mode as int) |req, uverr| { assert!(uverr.is_none()); let fd = req.get_result(); let buf = unsafe { *write_buf_ptr }; @@ -549,7 +538,7 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat = req.get_stat(); @@ -560,11 +549,13 @@ mod test { assert!(uverr.is_none()); let loop_ = req.get_loop(); let unlink_req = FsRequest::new(); - do unlink_req.unlink(&loop_, &path) |req,uverr| { + do unlink_req.unlink(&loop_, + &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |_, uverr| { + do stat_req.stat(&loop_, + &path.to_c_str()) |_, uverr| { // should cause an error because the // file doesn't exist anymore assert!(uverr.is_some()); @@ -587,22 +578,23 @@ mod test { let mode = S_IWUSR | S_IRUSR; let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), + mode as int) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |req, uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat = req.get_stat(); naive_print(&loop_, format!("{:?}", stat)); assert!(stat.is_dir()); let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |req,uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let stat_req = FsRequest::new(); - do stat_req.stat(&loop_, &path) |_req, uverr| { + do stat_req.stat(&loop_, &path.to_c_str()) |_req, uverr| { assert!(uverr.is_some()); } } @@ -620,16 +612,17 @@ mod test { let mode = S_IWUSR | S_IRUSR; let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), mode as int) |req,uverr| { assert!(uverr.is_none()); let loop_ = req.get_loop(); let mkdir_req = FsRequest::new(); - do mkdir_req.mkdir(&loop_, &path, mode as int) |req,uverr| { + do mkdir_req.mkdir(&loop_, &path.to_c_str(), + mode as int) |req,uverr| { assert!(uverr.is_some()); let loop_ = req.get_loop(); let _stat = req.get_stat(); let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |req,uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |req,uverr| { assert!(uverr.is_none()); let _loop = req.get_loop(); } @@ -645,7 +638,7 @@ mod test { let mut loop_ = Loop::new(); let path = "./tmp/never_existed_dir"; let rmdir_req = FsRequest::new(); - do rmdir_req.rmdir(&loop_, &path) |_req, uverr| { + do rmdir_req.rmdir(&loop_, &path.to_c_str()) |_req, uverr| { assert!(uverr.is_some()); } loop_.run(); diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index 8cbcd7b77c0..40f0750b2d0 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -11,7 +11,7 @@ use libc::c_int; use option::Some; use rt::uv::uvll; -use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback}; use rt::uv::status_to_maybe_uv_error; pub struct IdleWatcher(*uvll::uv_idle_t); @@ -20,9 +20,9 @@ impl Watcher for IdleWatcher { } impl IdleWatcher { pub fn new(loop_: &mut Loop) -> IdleWatcher { unsafe { - let handle = uvll::idle_new(); + let handle = uvll::malloc_handle(uvll::UV_IDLE); assert!(handle.is_not_null()); - assert!(0 == uvll::idle_init(loop_.native_handle(), handle)); + assert_eq!(uvll::idle_init(loop_.native_handle(), handle), 0); let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle); watcher.install_watcher_data(); return watcher @@ -36,29 +36,14 @@ impl IdleWatcher { } unsafe { - assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - let data = idle_watcher.get_watcher_data(); - let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(idle_watcher, status); + assert_eq!(uvll::idle_start(self.native_handle(), idle_cb), 0) } } pub fn restart(&mut self) { unsafe { - assert!(0 == uvll::idle_start(self.native_handle(), idle_cb)) - }; - - extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - let data = idle_watcher.get_watcher_data(); - let cb: &IdleCallback = data.idle_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(idle_watcher, status); + assert!(self.get_watcher_data().idle_cb.is_some()); + assert_eq!(uvll::idle_start(self.native_handle(), idle_cb), 0) } } @@ -68,30 +53,7 @@ impl IdleWatcher { // free unsafe { - assert!(0 == uvll::idle_stop(self.native_handle())); - } - } - - pub fn close(self, cb: NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb) }; - - extern fn close_cb(handle: *uvll::uv_idle_t) { - unsafe { - let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); - { - let data = idle_watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - idle_watcher.drop_watcher_data(); - uvll::idle_delete(handle); - } + assert_eq!(uvll::idle_stop(self.native_handle()), 0); } } } @@ -105,6 +67,14 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { } } +extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) { + let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle); + let data = idle_watcher.get_watcher_data(); + let cb: &IdleCallback = data.idle_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(idle_watcher, status); +} + #[cfg(test)] mod test { diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index 3a6a3acbc53..c92a54425bf 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -48,6 +48,7 @@ use cast::transmute; use ptr::null; use unstable::finally::Finally; use rt::io::net::ip::SocketAddr; +use rt::io::signal::Signum; use rt::io::IoError; @@ -60,6 +61,7 @@ pub use self::timer::TimerWatcher; pub use self::async::AsyncWatcher; pub use self::process::Process; pub use self::pipe::Pipe; +pub use self::signal::SignalWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -75,6 +77,8 @@ pub mod async; pub mod addrinfo; pub mod process; pub mod pipe; +pub mod tty; +pub mod signal; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -83,6 +87,14 @@ pub struct Loop { priv handle: *uvll::uv_loop_t } +pub struct Handle(*uvll::uv_handle_t); + +impl Watcher for Handle {} +impl NativeHandle<*uvll::uv_handle_t> for Handle { + fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) } + fn native_handle(&self) -> *uvll::uv_handle_t { **self } +} + /// The trait implemented by uv 'watchers' (handles). Watchers are /// non-owning wrappers around the uv handles and are not completely /// safe - there may be multiple instances for a single underlying @@ -137,6 +149,7 @@ pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>); pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>); pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>); pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>); +pub type SignalCallback = ~fn(SignalWatcher, Signum); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle. @@ -153,6 +166,7 @@ struct WatcherData { udp_recv_cb: Option<UdpReceiveCallback>, udp_send_cb: Option<UdpSendCallback>, exit_cb: Option<ExitCallback>, + signal_cb: Option<SignalCallback>, } pub trait WatcherInterop { @@ -160,6 +174,8 @@ pub trait WatcherInterop { fn install_watcher_data(&mut self); fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData; fn drop_watcher_data(&mut self); + fn close(self, cb: NullCallback); + fn close_async(self); } impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { @@ -186,6 +202,7 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { udp_recv_cb: None, udp_send_cb: None, exit_cb: None, + signal_cb: None, }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -207,6 +224,34 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { uvll::set_data_for_uv_handle(self.native_handle(), null::<()>()); } } + + fn close(self, cb: NullCallback) { + let mut this = self; + { + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + } + + unsafe { uvll::close(this.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let mut h: Handle = NativeHandle::from_native_handle(handle); + h.get_watcher_data().close_cb.take_unwrap()(); + h.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void) } + } + } + + fn close_async(self) { + unsafe { uvll::close(self.native_handle(), close_cb); } + + extern fn close_cb(handle: *uvll::uv_handle_t) { + let mut h: Handle = NativeHandle::from_native_handle(handle); + h.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void) } + } + } } // XXX: Need to define the error constants like EOF so they can be @@ -297,6 +342,13 @@ pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> /// The uv buffer type pub type Buf = uvll::uv_buf_t; +pub fn empty_buf() -> Buf { + uvll::uv_buf_t { + base: null(), + len: 0, + } +} + /// Borrow a slice to a Buf pub fn slice_to_uv_buf(v: &[u8]) -> Buf { let data = vec::raw::to_ptr(v); diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index a2608bf6b24..77de8348c14 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -13,8 +13,8 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint}; use rt::uv::uvll; use rt::uv::uvll::*; use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback}; -use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback, - status_to_maybe_uv_error, vec_to_uv_buf}; +use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, + status_to_maybe_uv_error, empty_buf}; use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; use vec; use str; @@ -27,7 +27,7 @@ pub enum UvSocketAddr { UvIpv6SocketAddr(*sockaddr_in6), } -fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr { +pub fn sockaddr_to_UvSocketAddr(addr: *uvll::sockaddr) -> UvSocketAddr { unsafe { assert!((is_ip4_addr(addr) || is_ip6_addr(addr))); assert!(!(is_ip4_addr(addr) && is_ip6_addr(addr))); @@ -96,28 +96,6 @@ pub fn uv_socket_addr_to_socket_addr(addr: UvSocketAddr) -> SocketAddr { uv_socket_addr_as_socket_addr(addr, util::id) } -// Traverse the addrinfo linked list, producing a vector of Rust socket addresses -pub fn accum_sockaddrs(addr: &UvAddrInfo) -> ~[SocketAddr] { - unsafe { - let &UvAddrInfo(addr) = addr; - let mut addr = addr; - - let mut addrs = ~[]; - loop { - let uvaddr = sockaddr_to_UvSocketAddr((*addr).ai_addr); - let rustaddr = uv_socket_addr_to_socket_addr(uvaddr); - addrs.push(rustaddr); - if (*addr).ai_next.is_not_null() { - addr = (*addr).ai_next; - } else { - break; - } - } - - return addrs; - } -} - #[cfg(test)] #[test] fn test_ip4_conversion() { @@ -141,23 +119,17 @@ impl Watcher for StreamWatcher { } impl StreamWatcher { pub fn read_start(&mut self, alloc: AllocCallback, cb: ReadCallback) { - { - let data = self.get_watcher_data(); - data.alloc_cb = Some(alloc); - data.read_cb = Some(cb); - } - - let ret = unsafe { uvll::read_start(self.native_handle(), alloc_cb, read_cb) }; - - if ret != 0 { - // uvll::read_start failed, so read_cb will not be called. - // Call it manually for scheduling. - call_read_cb(self.native_handle(), ret as ssize_t); - } - - fn call_read_cb(stream: *uvll::uv_stream_t, errno: ssize_t) { - #[fixed_stack_segment]; #[inline(never)]; - read_cb(stream, errno, vec_to_uv_buf(~[])); + unsafe { + match uvll::read_start(self.native_handle(), alloc_cb, read_cb) { + 0 => { + let data = self.get_watcher_data(); + data.alloc_cb = Some(alloc); + data.read_cb = Some(cb); + } + n => { + cb(*self, 0, empty_buf(), Some(UvError(n))) + } + } } extern fn alloc_cb(stream: *uvll::uv_stream_t, suggested_size: size_t) -> Buf { @@ -181,20 +153,25 @@ impl StreamWatcher { // but read_stop may be called from inside one of them and we // would end up freeing the in-use environment let handle = self.native_handle(); - unsafe { uvll::read_stop(handle); } + unsafe { assert_eq!(uvll::read_stop(handle), 0); } } pub fn write(&mut self, buf: Buf, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.write_cb.is_none()); - data.write_cb = Some(cb); - } - let req = WriteRequest::new(); - unsafe { - assert_eq!(0, uvll::write(req.native_handle(), self.native_handle(), [buf], write_cb)); - } + return unsafe { + match uvll::write(req.native_handle(), self.native_handle(), + [buf], write_cb) { + 0 => { + let data = self.get_watcher_data(); + assert!(data.write_cb.is_none()); + data.write_cb = Some(cb); + } + n => { + req.delete(); + cb(*self, Some(UvError(n))) + } + } + }; extern fn write_cb(req: *uvll::uv_write_t, status: c_int) { let write_request: WriteRequest = NativeHandle::from_native_handle(req); @@ -206,30 +183,36 @@ impl StreamWatcher { } } - pub fn accept(&mut self, stream: StreamWatcher) { - let self_handle = self.native_handle() as *c_void; - let stream_handle = stream.native_handle() as *c_void; - assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); - } - pub fn close(self, cb: NullCallback) { + pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> { { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); } - unsafe { uvll::close(self.native_handle(), close_cb); } + return unsafe { + static BACKLOG: c_int = 128; // XXX should be configurable + match uvll::listen(self.native_handle(), BACKLOG, connection_cb) { + 0 => Ok(()), + n => Err(UvError(n)) + } + }; - extern fn close_cb(handle: *uvll::uv_stream_t) { + extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { + rtdebug!("connection_cb"); let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap(); - stream_watcher.drop_watcher_data(); - unsafe { free_handle(handle as *c_void) } - cb(); + let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); + let status = status_to_maybe_uv_error(status); + (*cb)(stream_watcher, status); } } + + pub fn accept(&mut self, stream: StreamWatcher) { + let self_handle = self.native_handle() as *c_void; + let stream_handle = stream.native_handle() as *c_void; + assert_eq!(0, unsafe { uvll::accept(self_handle, stream_handle) } ); + } } impl NativeHandle<*uvll::uv_stream_t> for StreamWatcher { @@ -300,28 +283,6 @@ impl TcpWatcher { } } - pub fn listen(&mut self, cb: ConnectionCallback) { - { - let data = self.get_watcher_data(); - assert!(data.connect_cb.is_none()); - data.connect_cb = Some(cb); - } - - unsafe { - static BACKLOG: c_int = 128; // XXX should be configurable - // XXX: This can probably fail - assert_eq!(0, uvll::listen(self.native_handle(), BACKLOG, connection_cb)); - } - - extern fn connection_cb(handle: *uvll::uv_stream_t, status: c_int) { - rtdebug!("connection_cb"); - let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle); - let cb = stream_watcher.get_watcher_data().connect_cb.get_ref(); - let status = status_to_maybe_uv_error(status); - (*cb)(stream_watcher, status); - } - } - pub fn as_stream(&self) -> StreamWatcher { NativeHandle::from_native_handle(self.native_handle() as *uvll::uv_stream_t) } @@ -433,25 +394,6 @@ impl UdpWatcher { cb(udp_watcher, status); } } - - pub fn close(self, cb: NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_udp_t) { - let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle); - let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap(); - udp_watcher.drop_watcher_data(); - unsafe { free_handle(handle as *c_void) } - cb(); - } - } } impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { @@ -464,12 +406,12 @@ impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher { } // uv_connect_t is a subclass of uv_req_t -struct ConnectRequest(*uvll::uv_connect_t); +pub struct ConnectRequest(*uvll::uv_connect_t); impl Request for ConnectRequest { } impl ConnectRequest { - fn new() -> ConnectRequest { + pub fn new() -> ConnectRequest { let connect_handle = unsafe { malloc_req(UV_CONNECT) }; assert!(connect_handle.is_not_null()); ConnectRequest(connect_handle as *uvll::uv_connect_t) @@ -644,7 +586,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -678,7 +621,9 @@ mod test { } count_cell.put_back(count); } - } + }; + + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); @@ -705,7 +650,7 @@ mod test { loop_.run(); loop_.close(); client_thread.join(); - } + }; } #[test] @@ -718,7 +663,8 @@ mod test { server_tcp_watcher.bind(addr); let loop_ = loop_; rtdebug!("listening"); - do server_tcp_watcher.listen |mut server_stream_watcher, status| { + let mut stream = server_tcp_watcher.as_stream(); + let res = do stream.listen |mut server_stream_watcher, status| { rtdebug!("listened!"); assert!(status.is_none()); let mut loop_ = loop_; @@ -754,7 +700,8 @@ mod test { } count_cell.put_back(count); } - } + }; + assert!(res.is_ok()); let client_thread = do Thread::start { rtdebug!("starting client thread"); diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs index 1147c731a60..74b9312954c 100644 --- a/src/libstd/rt/uv/pipe.rs +++ b/src/libstd/rt/uv/pipe.rs @@ -10,6 +10,7 @@ use prelude::*; use libc; +use c_str::CString; use rt::uv; use rt::uv::net; @@ -37,23 +38,54 @@ impl Pipe { net::StreamWatcher(**self as *uvll::uv_stream_t) } - pub fn close(self, cb: uv::NullCallback) { + #[fixed_stack_segment] #[inline(never)] + pub fn open(&mut self, file: libc::c_int) -> Result<(), uv::UvError> { + match unsafe { uvll::pipe_open(self.native_handle(), file) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn bind(&mut self, name: &CString) -> Result<(), uv::UvError> { + do name.with_ref |name| { + match unsafe { uvll::pipe_bind(self.native_handle(), name) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + } + + #[fixed_stack_segment] #[inline(never)] + pub fn connect(&mut self, name: &CString, cb: uv::ConnectionCallback) { { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); + let data = self.get_watcher_data(); + assert!(data.connect_cb.is_none()); + data.connect_cb = Some(cb); } - unsafe { uvll::close(self.native_handle(), close_cb); } + let connect = net::ConnectRequest::new(); + let name = do name.with_ref |p| { p }; - extern fn close_cb(handle: *uvll::uv_pipe_t) { - let mut process: Pipe = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } + unsafe { + uvll::pipe_connect(connect.native_handle(), + self.native_handle(), + name, + connect_cb) + } + + extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) { + let connect_request: net::ConnectRequest = + uv::NativeHandle::from_native_handle(req); + let mut stream_watcher = connect_request.stream(); + connect_request.delete(); + + let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap(); + let status = uv::status_to_maybe_uv_error(status); + cb(stream_watcher, status); } } + } impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe { diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs index 176754de8f7..f0d0afeb6aa 100644 --- a/src/libstd/rt/uv/process.rs +++ b/src/libstd/rt/uv/process.rs @@ -12,12 +12,11 @@ use prelude::*; use cell::Cell; use libc; use ptr; -use util; use vec; use rt::io::process::*; use rt::uv; -use rt::uv::uvio::UvPipeStream; +use rt::uv::uvio::{UvPipeStream, UvUnboundPipe}; use rt::uv::uvll; /// A process wraps the handle of the underlying uv_process_t. @@ -42,9 +41,9 @@ impl Process { /// /// Returns either the corresponding process object or an error which /// occurred. - pub fn spawn(&mut self, loop_: &uv::Loop, mut config: ProcessConfig, + pub fn spawn(&mut self, loop_: &uv::Loop, config: ProcessConfig, exit_cb: uv::ExitCallback) - -> Result<~[Option<UvPipeStream>], uv::UvError> + -> Result<~[Option<~UvPipeStream>], uv::UvError> { let cwd = config.cwd.map(|s| s.to_c_str()); @@ -62,13 +61,14 @@ impl Process { err); } - let io = util::replace(&mut config.io, ~[]); + let io = config.io; let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(io.len()); let mut ret_io = vec::with_capacity(io.len()); unsafe { vec::raw::set_len(&mut stdio, io.len()); - for (slot, other) in stdio.iter().zip(io.move_iter()) { - let io = set_stdio(slot as *uvll::uv_stdio_container_t, other); + for (slot, other) in stdio.iter().zip(io.iter()) { + let io = set_stdio(slot as *uvll::uv_stdio_container_t, other, + loop_); ret_io.push(io); } } @@ -122,30 +122,12 @@ impl Process { pub fn pid(&self) -> libc::pid_t { unsafe { uvll::process_pid(**self) as libc::pid_t } } - - /// Closes this handle, invoking the specified callback once closed - pub fn close(self, cb: uv::NullCallback) { - { - let mut this = self; - let data = this.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { uvll::close(self.native_handle(), close_cb); } - - extern fn close_cb(handle: *uvll::uv_process_t) { - let mut process: Process = uv::NativeHandle::from_native_handle(handle); - process.get_watcher_data().close_cb.take_unwrap()(); - process.drop_watcher_data(); - unsafe { uvll::free_handle(handle as *libc::c_void) } - } - } } unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, - io: StdioContainer) -> Option<UvPipeStream> { - match io { + io: &StdioContainer, + loop_: &uv::Loop) -> Option<~UvPipeStream> { + match *io { Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); None @@ -155,7 +137,7 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, uvll::set_stdio_container_fd(dst, fd); None } - CreatePipe(pipe, readable, writable) => { + CreatePipe(readable, writable) => { let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int; if readable { flags |= uvll::STDIO_READABLE_PIPE as libc::c_int; @@ -163,10 +145,11 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, if writable { flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int; } + let pipe = UvUnboundPipe::new(loop_); let handle = pipe.pipe.as_stream().native_handle(); uvll::set_stdio_container_flags(dst, flags); uvll::set_stdio_container_stream(dst, handle); - Some(pipe.bind()) + Some(~UvPipeStream::new(pipe)) } } } diff --git a/src/libstd/rt/uv/signal.rs b/src/libstd/rt/uv/signal.rs new file mode 100644 index 00000000000..3252c89673d --- /dev/null +++ b/src/libstd/rt/uv/signal.rs @@ -0,0 +1,73 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use cast; +use option::Some; +use libc::c_int; +use result::{Err, Ok, Result}; +use rt::io::signal::Signum; +use rt::uv::{Loop, NativeHandle, SignalCallback, UvError, Watcher}; +use rt::uv::uvll; + +pub struct SignalWatcher(*uvll::uv_signal_t); + +impl Watcher for SignalWatcher { } + +impl SignalWatcher { + pub fn new(loop_: &mut Loop) -> SignalWatcher { + unsafe { + let handle = uvll::malloc_handle(uvll::UV_SIGNAL); + assert!(handle.is_not_null()); + assert!(0 == uvll::signal_init(loop_.native_handle(), handle)); + let mut watcher: SignalWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + return watcher; + } + } + + pub fn start(&mut self, signum: Signum, callback: SignalCallback) + -> Result<(), UvError> + { + return unsafe { + match uvll::signal_start(self.native_handle(), signal_cb, + signum as c_int) { + 0 => { + let data = self.get_watcher_data(); + data.signal_cb = Some(callback); + Ok(()) + } + n => Err(UvError(n)), + } + }; + + extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { + let mut watcher: SignalWatcher = NativeHandle::from_native_handle(handle); + let data = watcher.get_watcher_data(); + let cb = data.signal_cb.get_ref(); + (*cb)(watcher, unsafe { cast::transmute(signum as int) }); + } + } + + pub fn stop(&mut self) { + unsafe { + uvll::signal_stop(self.native_handle()); + } + } +} + +impl NativeHandle<*uvll::uv_signal_t> for SignalWatcher { + fn from_native_handle(handle: *uvll::uv_signal_t) -> SignalWatcher { + SignalWatcher(handle) + } + + fn native_handle(&self) -> *uvll::uv_signal_t { + match self { &SignalWatcher(ptr) => ptr } + } +} diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs index 7b09cf2eb0e..fb3c84df39f 100644 --- a/src/libstd/rt/uv/timer.rs +++ b/src/libstd/rt/uv/timer.rs @@ -8,10 +8,10 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::{c_void, c_int}; +use libc::c_int; use option::Some; use rt::uv::uvll; -use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback}; +use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback}; use rt::uv::status_to_maybe_uv_error; pub struct TimerWatcher(*uvll::uv_timer_t); @@ -53,31 +53,6 @@ impl TimerWatcher { uvll::timer_stop(self.native_handle()); } } - - pub fn close(self, cb: NullCallback) { - let mut watcher = self; - { - let data = watcher.get_watcher_data(); - assert!(data.close_cb.is_none()); - data.close_cb = Some(cb); - } - - unsafe { - uvll::close(watcher.native_handle(), close_cb); - } - - extern fn close_cb(handle: *uvll::uv_timer_t) { - let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle); - { - let data = watcher.get_watcher_data(); - data.close_cb.take_unwrap()(); - } - watcher.drop_watcher_data(); - unsafe { - uvll::free_handle(handle as *c_void); - } - } - } } impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher { diff --git a/src/libstd/rt/uv/tty.rs b/src/libstd/rt/uv/tty.rs new file mode 100644 index 00000000000..f44c5ae8eff --- /dev/null +++ b/src/libstd/rt/uv/tty.rs @@ -0,0 +1,84 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use prelude::*; +use libc; + +use rt::uv; +use rt::uv::net; +use rt::uv::uvll; + +/// A process wraps the handle of the underlying uv_process_t. +pub struct TTY(*uvll::uv_tty_t); + +impl uv::Watcher for TTY {} + +impl TTY { + #[fixed_stack_segment] #[inline(never)] + pub fn new(loop_: &uv::Loop, fd: libc::c_int, readable: bool) -> + Result<TTY, uv::UvError> + { + let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) }; + assert!(handle.is_not_null()); + + let ret = unsafe { + uvll::tty_init(loop_.native_handle(), handle, fd as libc::c_int, + readable as libc::c_int) + }; + match ret { + 0 => { + let mut ret: TTY = uv::NativeHandle::from_native_handle(handle); + ret.install_watcher_data(); + Ok(ret) + } + n => { + unsafe { uvll::free_handle(handle); } + Err(uv::UvError(n)) + } + } + } + + pub fn as_stream(&self) -> net::StreamWatcher { + net::StreamWatcher(**self as *uvll::uv_stream_t) + } + + #[fixed_stack_segment] #[inline(never)] + pub fn set_mode(&self, raw: bool) -> Result<(), uv::UvError> { + let raw = raw as libc::c_int; + match unsafe { uvll::tty_set_mode(self.native_handle(), raw) } { + 0 => Ok(()), + n => Err(uv::UvError(n)) + } + } + + #[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)] + pub fn get_winsize(&self) -> Result<(int, int), uv::UvError> { + let mut width: libc::c_int = 0; + let mut height: libc::c_int = 0; + let widthptr: *libc::c_int = &width; + let heightptr: *libc::c_int = &width; + + match unsafe { uvll::tty_get_winsize(self.native_handle(), + widthptr, heightptr) } { + 0 => Ok((width as int, height as int)), + n => Err(uv::UvError(n)) + } + } +} + +impl uv::NativeHandle<*uvll::uv_tty_t> for TTY { + fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY { + TTY(handle) + } + fn native_handle(&self) -> *uvll::uv_tty_t { + match self { &TTY(ptr) => ptr } + } +} + diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 8dd0f8a6b10..29370c484eb 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -8,17 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use c_str::ToCStr; +use c_str::{ToCStr, CString}; use cast::transmute; use cast; use cell::Cell; use clone::Clone; +use comm::{SendDeferred, SharedChan}; use libc::{c_int, c_uint, c_void, pid_t}; use ops::Drop; use option::*; use ptr; use str; -use str::Str; use result::*; use rt::io::IoError; use rt::io::net::ip::{SocketAddr, IpAddr}; @@ -32,17 +32,18 @@ use rt::tube::Tube; use rt::task::SchedHome; use rt::uv::*; use rt::uv::idle::IdleWatcher; -use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr, accum_sockaddrs}; -use rt::uv::addrinfo::GetAddrInfoRequest; +use rt::uv::net::{UvIpv4SocketAddr, UvIpv6SocketAddr}; +use rt::uv::addrinfo::{GetAddrInfoRequest, accum_addrinfo}; use unstable::sync::Exclusive; use path::{GenericPath, Path}; -use super::super::io::support::PathLike; use libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY, O_WRONLY, S_IRUSR, S_IWUSR, S_IRWXU}; use rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create, CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite, FileStat}; +use rt::io::signal::Signum; use task; +use ai = rt::io::net::addrinfo; #[cfg(test)] use container::Container; #[cfg(test)] use unstable::run_in_bare_thread; @@ -214,11 +215,11 @@ impl EventLoop for UvEventLoop { fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback { let idle_watcher = IdleWatcher::new(self.uvio.uv_loop()); - return ~UvPausibleIdleCallback { + ~UvPausibleIdleCallback { watcher: idle_watcher, idle_flag: false, closed: false - }; + } as ~PausibleIdleCallback } fn callback_ms(&mut self, ms: u64, f: ~fn()) { @@ -230,12 +231,12 @@ impl EventLoop for UvEventLoop { } } - fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { - ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback } - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { - Some(&mut self.uvio) + fn io<'a>(&'a mut self, f: &fn(&'a mut IoFactory)) { + f(&mut self.uvio as &mut IoFactory) } } @@ -245,30 +246,30 @@ pub struct UvPausibleIdleCallback { priv closed: bool } -impl UvPausibleIdleCallback { +impl PausibleIdleCallback for UvPausibleIdleCallback { #[inline] - pub fn start(&mut self, f: ~fn()) { + fn start(&mut self, f: ~fn()) { do self.watcher.start |_idle_watcher, _status| { f(); }; self.idle_flag = true; } #[inline] - pub fn pause(&mut self) { + fn pause(&mut self) { if self.idle_flag == true { self.watcher.stop(); self.idle_flag = false; } } #[inline] - pub fn resume(&mut self) { + fn resume(&mut self) { if self.idle_flag == false { self.watcher.restart(); self.idle_flag = true; } } #[inline] - pub fn close(&mut self) { + fn close(&mut self) { self.pause(); if !self.closed { self.closed = true; @@ -414,9 +415,9 @@ impl UvIoFactory { /// Helper for a variety of simple uv_fs_* functions that /// have no ret val -fn uv_fs_helper<P: PathLike>(loop_: &mut Loop, path: &P, - cb: ~fn(&mut FsRequest, &mut Loop, &P, - ~fn(&FsRequest, Option<UvError>))) +fn uv_fs_helper(loop_: &mut Loop, path: &CString, + cb: ~fn(&mut FsRequest, &mut Loop, &CString, + ~fn(&FsRequest, Option<UvError>))) -> Result<(), IoError> { let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell; @@ -446,11 +447,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStreamObject, IoError> { + fn tcp_connect(&mut self, addr: SocketAddr) -> Result<~RtioTcpStream, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell; + let result_cell_ptr: *Cell<Result<~RtioTcpStream, IoError>> = &result_cell; // Block this task and take ownership, switch to scheduler context do task::unkillable { // FIXME(#8674) @@ -466,7 +467,8 @@ impl IoFactory for UvIoFactory { None => { let tcp = NativeHandle::from_native_handle(stream.native_handle()); let home = get_handle_to_current_scheduler!(); - let res = Ok(~UvTcpStream { watcher: tcp, home: home }); + let res = Ok(~UvTcpStream { watcher: tcp, home: home } + as ~RtioTcpStream); // Store the stream in the task's stack unsafe { (*result_cell_ptr).put_back(res); } @@ -493,12 +495,12 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListenerObject, IoError> { + fn tcp_bind(&mut self, addr: SocketAddr) -> Result<~RtioTcpListener, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpListener::new(watcher, home)) + Ok(~UvTcpListener::new(watcher, home) as ~RtioTcpListener) } Err(uverr) => { do task::unkillable { // FIXME(#8674) @@ -516,12 +518,12 @@ impl IoFactory for UvIoFactory { } } - fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocketObject, IoError> { + fn udp_bind(&mut self, addr: SocketAddr) -> Result<~RtioUdpSocket, IoError> { let mut watcher = UdpWatcher::new(self.uv_loop()); match watcher.bind(addr) { Ok(_) => { let home = get_handle_to_current_scheduler!(); - Ok(~UvUdpSocket { watcher: watcher, home: home }) + Ok(~UvUdpSocket { watcher: watcher, home: home } as ~RtioUdpSocket) } Err(uverr) => { do task::unkillable { // FIXME(#8674) @@ -539,19 +541,19 @@ impl IoFactory for UvIoFactory { } } - fn timer_init(&mut self) -> Result<~RtioTimerObject, IoError> { + fn timer_init(&mut self) -> Result<~RtioTimer, IoError> { let watcher = TimerWatcher::new(self.uv_loop()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTimer::new(watcher, home)) + Ok(~UvTimer::new(watcher, home) as ~RtioTimer) } - fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream { + fn fs_from_raw_fd(&mut self, fd: c_int, close: CloseBehavior) -> ~RtioFileStream { let loop_ = Loop {handle: self.uv_loop().native_handle()}; let home = get_handle_to_current_scheduler!(); - ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream + ~UvFileStream::new(loop_, fd, close, home) as ~RtioFileStream } - fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess) + fn fs_open(&mut self, path: &CString, fm: FileMode, fa: FileAccess) -> Result<~RtioFileStream, IoError> { let mut flags = match fm { Open => 0, @@ -588,7 +590,7 @@ impl IoFactory for UvIoFactory { let home = get_handle_to_current_scheduler!(); let fd = req.get_result() as c_int; let fs = ~UvFileStream::new( - loop_, fd, true, home) as ~RtioFileStream; + loop_, fd, CloseSynchronously, home) as ~RtioFileStream; let res = Ok(fs); unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); @@ -606,14 +608,14 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_unlink(&mut self, path: &CString) -> Result<(), IoError> { do uv_fs_helper(self.uv_loop(), path) |unlink_req, l, p, cb| { do unlink_req.unlink(l, p) |req, err| { cb(req, err) }; } } - fn fs_stat<P: PathLike>(&mut self, path: &P) -> Result<FileStat, IoError> { + fn fs_stat(&mut self, path: &CString) -> Result<FileStat, IoError> { use str::StrSlice; let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell<Result<FileStat, @@ -625,14 +627,15 @@ impl IoFactory for UvIoFactory { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let path = path_cell.take(); - let path_str = path.path_as_str(|p| p.to_owned()); - do stat_req.stat(self.uv_loop(), path) - |req,err| { + // Don't pick up the null byte + let slice = path.as_bytes().slice(0, path.len()); + let path_instance = Cell::new(Path::new(slice)); + do stat_req.stat(self.uv_loop(), path) |req,err| { let res = match err { None => { let stat = req.get_stat(); Ok(FileStat { - path: Path::new(path_str.as_slice()), + path: path_instance.take(), is_file: stat.is_file(), is_dir: stat.is_dir(), device: stat.st_dev, @@ -658,12 +661,16 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn get_host_addresses(&mut self, host: &str) -> Result<~[IpAddr], IoError> { + fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, + hint: Option<ai::Hint>) -> Result<~[ai::Info], IoError> { let result_cell = Cell::new_empty(); - let result_cell_ptr: *Cell<Result<~[IpAddr], IoError>> = &result_cell; - let host_ptr: *&str = &host; + let result_cell_ptr: *Cell<Result<~[ai::Info], IoError>> = &result_cell; + let host_ptr: *Option<&str> = &host; + let servname_ptr: *Option<&str> = &servname; + let hint_ptr: *Option<ai::Hint> = &hint; let addrinfo_req = GetAddrInfoRequest::new(); let addrinfo_req_cell = Cell::new(addrinfo_req); + do task::unkillable { // FIXME(#8674) let scheduler: ~Scheduler = Local::take(); do scheduler.deschedule_running_task_and_then |_, task| { @@ -671,10 +678,10 @@ impl IoFactory for UvIoFactory { let mut addrinfo_req = addrinfo_req_cell.take(); unsafe { do addrinfo_req.getaddrinfo(self.uv_loop(), - Some(*host_ptr), - None, None) |_, addrinfo, err| { + *host_ptr, *servname_ptr, + *hint_ptr) |_, addrinfo, err| { let res = match err { - None => Ok(accum_sockaddrs(addrinfo).map(|addr| addr.ip.clone())), + None => Ok(accum_addrinfo(addrinfo)), Some(err) => Err(uv_error_to_io_error(err)) }; (*result_cell_ptr).put_back(res); @@ -688,7 +695,7 @@ impl IoFactory for UvIoFactory { assert!(!result_cell.is_empty()); return result_cell.take(); } - fn fs_mkdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_mkdir(&mut self, path: &CString) -> Result<(), IoError> { let mode = S_IRWXU as int; do uv_fs_helper(self.uv_loop(), path) |mkdir_req, l, p, cb| { do mkdir_req.mkdir(l, p, mode as int) |req, err| { @@ -696,14 +703,14 @@ impl IoFactory for UvIoFactory { }; } } - fn fs_rmdir<P: PathLike>(&mut self, path: &P) -> Result<(), IoError> { + fn fs_rmdir(&mut self, path: &CString) -> Result<(), IoError> { do uv_fs_helper(self.uv_loop(), path) |rmdir_req, l, p, cb| { do rmdir_req.rmdir(l, p) |req, err| { cb(req, err) }; } } - fn fs_readdir<P: PathLike>(&mut self, path: &P, flags: c_int) -> + fn fs_readdir(&mut self, path: &CString, flags: c_int) -> Result<~[Path], IoError> { use str::StrSlice; let result_cell = Cell::new_empty(); @@ -716,17 +723,17 @@ impl IoFactory for UvIoFactory { do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); let path = path_cell.take(); - let path_str = path.path_as_str(|p| p.to_owned()); - do stat_req.readdir(self.uv_loop(), path, flags) - |req,err| { + // Don't pick up the null byte + let slice = path.as_bytes().slice(0, path.len()); + let path_parent = Cell::new(Path::new(slice)); + do stat_req.readdir(self.uv_loop(), path, flags) |req,err| { + let parent = path_parent.take(); let res = match err { None => { - let rel_paths = req.get_paths(); let mut paths = ~[]; - for r in rel_paths.iter() { - let mut p = Path::new(path_str.as_slice()); - p.push(r.as_slice()); - paths.push(p); + do req.each_path |rel_path| { + let p = rel_path.as_bytes(); + paths.push(parent.join(p.slice_to(rel_path.len()))); } Ok(paths) }, @@ -744,13 +751,8 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError> { - let home = get_handle_to_current_scheduler!(); - Ok(~UvUnboundPipe { pipe: Pipe::new(self.uv_loop(), ipc), home: home }) - } - fn spawn(&mut self, config: ProcessConfig) - -> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError> + -> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError> { // Sadly, we must create the UvProcess before we actually call uv_spawn // so that the exit_cb can close over it and notify it when the process @@ -792,7 +794,8 @@ impl IoFactory for UvIoFactory { Ok(io) => { // Only now do we actually get a handle to this scheduler. ret.home = Some(get_handle_to_current_scheduler!()); - Ok((ret, io)) + Ok((ret as ~RtioProcess, + io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect())) } Err(uverr) => { // We still need to close the process handle we created, but @@ -801,6 +804,76 @@ impl IoFactory for UvIoFactory { } } } + + fn unix_bind(&mut self, path: &CString) -> + Result<~RtioUnixListener, IoError> { + let mut pipe = UvUnboundPipe::new(self.uv_loop()); + match pipe.pipe.bind(path) { + Ok(()) => Ok(~UvUnixListener::new(pipe) as ~RtioUnixListener), + Err(e) => Err(uv_error_to_io_error(e)), + } + } + + fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError> { + let pipe = UvUnboundPipe::new(self.uv_loop()); + let mut rawpipe = pipe.pipe; + + let result_cell = Cell::new_empty(); + let result_cell_ptr: *Cell<Result<~RtioPipe, IoError>> = &result_cell; + let pipe_cell = Cell::new(pipe); + let pipe_cell_ptr: *Cell<UvUnboundPipe> = &pipe_cell; + + let scheduler: ~Scheduler = Local::take(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do rawpipe.connect(path) |_stream, err| { + let res = match err { + None => { + let pipe = unsafe { (*pipe_cell_ptr).take() }; + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) + } + Some(e) => Err(uv_error_to_io_error(e)), + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn tty_open(&mut self, fd: c_int, readable: bool) + -> Result<~RtioTTY, IoError> { + match tty::TTY::new(self.uv_loop(), fd, readable) { + Ok(tty) => Ok(~UvTTY { + home: get_handle_to_current_scheduler!(), + tty: tty, + fd: fd, + } as ~RtioTTY), + Err(e) => Err(uv_error_to_io_error(e)) + } + } + + fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError> { + let mut pipe = UvUnboundPipe::new(self.uv_loop()); + match pipe.pipe.open(fd) { + Ok(()) => Ok(~UvPipeStream::new(pipe) as ~RtioPipe), + Err(e) => Err(uv_error_to_io_error(e)) + } + } + + fn signal(&mut self, signum: Signum, channel: SharedChan<Signum>) + -> Result<~RtioSignal, IoError> { + let watcher = SignalWatcher::new(self.uv_loop()); + let home = get_handle_to_current_scheduler!(); + let mut signal = ~UvSignal::new(watcher, home); + match signal.watcher.start(signum, |_, _| channel.send_deferred(signum)) { + Ok(()) => Ok(signal as ~RtioSignal), + Err(e) => Err(uv_error_to_io_error(e)), + } + } } pub struct UvTcpListener { @@ -841,11 +914,12 @@ impl RtioSocket for UvTcpListener { } impl RtioTcpListener for UvTcpListener { - fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError> { + fn listen(~self) -> Result<~RtioTcpAcceptor, IoError> { do self.home_for_io_consume |self_| { - let mut acceptor = ~UvTcpAcceptor::new(self_); + let acceptor = ~UvTcpAcceptor::new(self_); let incoming = Cell::new(acceptor.incoming.clone()); - do acceptor.listener.watcher.listen |mut server, status| { + let mut stream = acceptor.listener.watcher.as_stream(); + let res = do stream.listen |mut server, status| { do incoming.with_mut_ref |incoming| { let inc = match status { Some(_) => Err(standard_error(OtherIoError)), @@ -854,20 +928,24 @@ impl RtioTcpListener for UvTcpListener { // first accept call in the callback guarenteed to succeed server.accept(inc.as_stream()); let home = get_handle_to_current_scheduler!(); - Ok(~UvTcpStream { watcher: inc, home: home }) + Ok(~UvTcpStream { watcher: inc, home: home } + as ~RtioTcpStream) } }; incoming.send(inc); } }; - Ok(acceptor) + match res { + Ok(()) => Ok(acceptor as ~RtioTcpAcceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } } } } pub struct UvTcpAcceptor { priv listener: UvTcpListener, - priv incoming: Tube<Result<~RtioTcpStreamObject, IoError>>, + priv incoming: Tube<Result<~RtioTcpStream, IoError>>, } impl HomingIO for UvTcpAcceptor { @@ -888,8 +966,19 @@ impl RtioSocket for UvTcpAcceptor { } } +fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> { + let r = unsafe { + uvll::tcp_simultaneous_accepts(stream.native_handle(), a as c_int) + }; + + match status_to_maybe_uv_error(r) { + Some(err) => Err(uv_error_to_io_error(err)), + None => Ok(()) + } +} + impl RtioTcpAcceptor for UvTcpAcceptor { - fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError> { + fn accept(&mut self) -> Result<~RtioTcpStream, IoError> { do self.home_for_io |self_| { self_.incoming.recv() } @@ -897,27 +986,13 @@ impl RtioTcpAcceptor for UvTcpAcceptor { fn accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 1 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 1) } } fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { do self.home_for_io |self_| { - let r = unsafe { - uvll::tcp_simultaneous_accepts(self_.listener.watcher.native_handle(), 0 as c_int) - }; - - match status_to_maybe_uv_error(r) { - Some(err) => Err(uv_error_to_io_error(err)), - None => Ok(()) - } + accept_simultaneously(self_.listener.watcher.as_stream(), 0) } } } @@ -994,6 +1069,17 @@ pub struct UvUnboundPipe { priv home: SchedHandle, } +impl UvUnboundPipe { + /// Creates a new unbound pipe homed to the current scheduler, placed on the + /// specified event loop + pub fn new(loop_: &Loop) -> UvUnboundPipe { + UvUnboundPipe { + pipe: Pipe::new(loop_, false), + home: get_handle_to_current_scheduler!(), + } + } +} + impl HomingIO for UvUnboundPipe { fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } @@ -1013,18 +1099,12 @@ impl Drop for UvUnboundPipe { } } -impl UvUnboundPipe { - pub unsafe fn bind(~self) -> UvPipeStream { - UvPipeStream { inner: self } - } -} - pub struct UvPipeStream { - priv inner: ~UvUnboundPipe, + priv inner: UvUnboundPipe, } impl UvPipeStream { - pub fn new(inner: ~UvUnboundPipe) -> UvPipeStream { + pub fn new(inner: UvUnboundPipe) -> UvPipeStream { UvPipeStream { inner: inner } } } @@ -1402,8 +1482,8 @@ impl RtioTimer for UvTimer { pub struct UvFileStream { priv loop_: Loop, priv fd: c_int, - priv close_on_drop: bool, - priv home: SchedHandle + priv close: CloseBehavior, + priv home: SchedHandle, } impl HomingIO for UvFileStream { @@ -1411,13 +1491,13 @@ impl HomingIO for UvFileStream { } impl UvFileStream { - fn new(loop_: Loop, fd: c_int, close_on_drop: bool, + fn new(loop_: Loop, fd: c_int, close: CloseBehavior, home: SchedHandle) -> UvFileStream { UvFileStream { loop_: loop_, fd: fd, - close_on_drop: close_on_drop, - home: home + close: close, + home: home, } } fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result<int, IoError> { @@ -1437,9 +1517,9 @@ impl UvFileStream { unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; - }; + } + } + } result_cell.take() } fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { @@ -1459,9 +1539,9 @@ impl UvFileStream { unsafe { (*result_cell_ptr).put_back(res); } let scheduler: ~Scheduler = Local::take(); scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; - }; + } + } + } result_cell.take() } fn seek_common(&mut self, pos: i64, whence: c_int) -> @@ -1484,16 +1564,23 @@ impl UvFileStream { impl Drop for UvFileStream { fn drop(&mut self) { - if self.close_on_drop { - do self.home_for_io_with_sched |self_, scheduler| { - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - let close_req = file::FsRequest::new(); - do close_req.close(&self_.loop_, self_.fd) |_,_| { - let scheduler: ~Scheduler = Local::take(); - scheduler.resume_blocked_task_immediately(task_cell.take()); - }; - }; + match self.close { + DontClose => {} + CloseAsynchronously => { + let close_req = file::FsRequest::new(); + do close_req.close(&self.loop_, self.fd) |_,_| {} + } + CloseSynchronously => { + do self.home_for_io_with_sched |self_, scheduler| { + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + let close_req = file::FsRequest::new(); + do close_req.close(&self_.loop_, self_.fd) |_,_| { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } } } } @@ -1612,13 +1699,185 @@ impl RtioProcess for UvProcess { } } +pub struct UvUnixListener { + priv inner: UvUnboundPipe +} + +impl HomingIO for UvUnixListener { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.inner.home() } +} + +impl UvUnixListener { + fn new(pipe: UvUnboundPipe) -> UvUnixListener { + UvUnixListener { inner: pipe } + } +} + +impl RtioUnixListener for UvUnixListener { + fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> { + do self.home_for_io_consume |self_| { + let acceptor = ~UvUnixAcceptor::new(self_); + let incoming = Cell::new(acceptor.incoming.clone()); + let mut stream = acceptor.listener.inner.pipe.as_stream(); + let res = do stream.listen |mut server, status| { + do incoming.with_mut_ref |incoming| { + let inc = match status { + Some(e) => Err(uv_error_to_io_error(e)), + None => { + let pipe = UvUnboundPipe::new(&server.event_loop()); + server.accept(pipe.pipe.as_stream()); + Ok(~UvPipeStream::new(pipe) as ~RtioPipe) + } + }; + incoming.send(inc); + } + }; + match res { + Ok(()) => Ok(acceptor as ~RtioUnixAcceptor), + Err(e) => Err(uv_error_to_io_error(e)), + } + } + } +} + +pub struct UvTTY { + tty: tty::TTY, + home: SchedHandle, + fd: c_int, +} + +impl HomingIO for UvTTY { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl Drop for UvTTY { + fn drop(&mut self) { + // TTY handles are used for the logger in a task, so this destructor is + // run when a task is destroyed. When a task is being destroyed, a local + // scheduler isn't available, so we can't do the normal "take the + // scheduler and resume once close is done". Instead close operations on + // a TTY are asynchronous. + self.tty.close_async(); + } +} + +impl RtioTTY for UvTTY { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { + do self.home_for_io_with_sched |self_, scheduler| { + read_stream(self_.tty.as_stream(), scheduler, buf) + } + } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + do self.home_for_io_with_sched |self_, scheduler| { + write_stream(self_.tty.as_stream(), scheduler, buf) + } + } + + fn set_raw(&mut self, raw: bool) -> Result<(), IoError> { + do self.home_for_io |self_| { + match self_.tty.set_mode(raw) { + Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) + } + } + } + + fn get_winsize(&mut self) -> Result<(int, int), IoError> { + do self.home_for_io |self_| { + match self_.tty.get_winsize() { + Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e)) + } + } + } + + fn isatty(&self) -> bool { + unsafe { uvll::guess_handle(self.fd) == uvll::UV_TTY } + } +} + +pub struct UvUnixAcceptor { + listener: UvUnixListener, + incoming: Tube<Result<~RtioPipe, IoError>>, +} + +impl HomingIO for UvUnixAcceptor { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() } +} + +impl UvUnixAcceptor { + fn new(listener: UvUnixListener) -> UvUnixAcceptor { + UvUnixAcceptor { listener: listener, incoming: Tube::new() } + } +} + +impl RtioUnixAcceptor for UvUnixAcceptor { + fn accept(&mut self) -> Result<~RtioPipe, IoError> { + do self.home_for_io |self_| { + self_.incoming.recv() + } + } + + fn accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1) + } + } + + fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> { + do self.home_for_io |self_| { + accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0) + } + } +} + +pub struct UvSignal { + watcher: signal::SignalWatcher, + home: SchedHandle, +} + +impl HomingIO for UvSignal { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } +} + +impl UvSignal { + fn new(w: signal::SignalWatcher, home: SchedHandle) -> UvSignal { + UvSignal { watcher: w, home: home } + } +} + +impl RtioSignal for UvSignal {} + +impl Drop for UvSignal { + fn drop(&mut self) { + do self.home_for_io_with_sched |self_, scheduler| { + rtdebug!("closing UvSignal"); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.watcher.close { + let scheduler: ~Scheduler = Local::take(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } + } +} + +// this function is full of lies +unsafe fn local_io() -> &'static mut IoFactory { + do Local::borrow |sched: &mut Scheduler| { + let mut io = None; + sched.event_loop.io(|i| io = Some(i)); + cast::transmute(io.unwrap()) + } +} + #[test] fn test_simple_io_no_connect() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let addr = next_test_ip4(); - let maybe_chan = (*io).tcp_connect(addr); + let maybe_chan = io.tcp_connect(addr); assert!(maybe_chan.is_err()); } } @@ -1628,9 +1887,9 @@ fn test_simple_io_no_connect() { fn test_simple_udp_io_bind_only() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let addr = next_test_ip4(); - let maybe_socket = (*io).udp_bind(addr); + let maybe_socket = io.udp_bind(addr); assert!(maybe_socket.is_ok()); } } @@ -1649,9 +1908,11 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { let work_queue2 = WorkQueue::new(); let queues = ~[work_queue1.clone(), work_queue2.clone()]; - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + let loop1 = ~UvEventLoop::new() as ~EventLoop; + let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + let loop2 = ~UvEventLoop::new() as ~EventLoop; + let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), sleepers.clone()); let handle1 = Cell::new(sched1.make_handle()); @@ -1665,11 +1926,9 @@ fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() { }; let test_function: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; + let io = unsafe { local_io() }; let addr = next_test_ip4(); - let maybe_socket = unsafe { (*io).udp_bind(addr) }; + let maybe_socket = io.udp_bind(addr); // this socket is bound to this event loop assert!(maybe_socket.is_ok()); @@ -1728,9 +1987,11 @@ fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { let work_queue2 = WorkQueue::new(); let queues = ~[work_queue1.clone(), work_queue2.clone()]; - let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + let loop1 = ~UvEventLoop::new() as ~EventLoop; + let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(), sleepers.clone()); - let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + let loop2 = ~UvEventLoop::new() as ~EventLoop; + let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(), sleepers.clone()); let handle1 = Cell::new(sched1.make_handle()); @@ -1741,11 +2002,9 @@ fn test_simple_homed_udp_io_bind_then_move_handle_then_home_and_close() { let chan = Cell::new(chan); let body1: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; + let io = unsafe { local_io() }; let addr = next_test_ip4(); - let socket = unsafe { (*io).udp_bind(addr) }; + let socket = io.udp_bind(addr); assert!(socket.is_ok()); chan.take().send(socket); }; @@ -1799,8 +2058,8 @@ fn test_simple_tcp_server_and_client() { // Start the server first so it's listening when we connect do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let listener = (*io).tcp_bind(addr).unwrap(); + let io = local_io(); + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -1817,8 +2076,8 @@ fn test_simple_tcp_server_and_client() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); } } @@ -1842,9 +2101,11 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { let client_work_queue = WorkQueue::new(); let queues = ~[server_work_queue.clone(), client_work_queue.clone()]; - let mut server_sched = ~Scheduler::new(~UvEventLoop::new(), server_work_queue, + let sloop = ~UvEventLoop::new() as ~EventLoop; + let mut server_sched = ~Scheduler::new(sloop, server_work_queue, queues.clone(), sleepers.clone()); - let mut client_sched = ~Scheduler::new(~UvEventLoop::new(), client_work_queue, + let cloop = ~UvEventLoop::new() as ~EventLoop; + let mut client_sched = ~Scheduler::new(cloop, client_work_queue, queues.clone(), sleepers.clone()); let server_handle = Cell::new(server_sched.make_handle()); @@ -1861,8 +2122,8 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let server_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let listener = unsafe { (*io).tcp_bind(server_addr).unwrap() }; + let io = unsafe { local_io() }; + let listener = io.tcp_bind(server_addr).unwrap(); let mut acceptor = listener.listen().unwrap(); let mut stream = acceptor.accept().unwrap(); let mut buf = [0, .. 2048]; @@ -1874,12 +2135,10 @@ fn test_simple_tcp_server_and_client_on_diff_threads() { }; let client_fn: ~fn() = || { - let io: *mut IoFactoryObject = unsafe { - Local::unsafe_borrow() - }; - let mut stream = unsafe { (*io).tcp_connect(client_addr) }; + let io = unsafe { local_io() }; + let mut stream = io.tcp_connect(client_addr); while stream.is_err() { - stream = unsafe { (*io).tcp_connect(client_addr) }; + stream = io.tcp_connect(client_addr); } stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]); }; @@ -1918,8 +2177,8 @@ fn test_simple_udp_server_and_client() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server_socket = (*io).udp_bind(server_addr).unwrap(); + let io = local_io(); + let mut server_socket = io.udp_bind(server_addr).unwrap(); chan.take().send(()); let mut buf = [0, .. 2048]; let (nread,src) = server_socket.recvfrom(buf).unwrap(); @@ -1934,8 +2193,8 @@ fn test_simple_udp_server_and_client() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client_socket = (*io).udp_bind(client_addr).unwrap(); + let io = local_io(); + let mut client_socket = io.udp_bind(client_addr).unwrap(); port.take().recv(); client_socket.sendto([0, 1, 2, 3, 4, 5, 6, 7], server_addr); } @@ -1952,8 +2211,8 @@ fn test_read_and_block() { let chan = Cell::new(chan); do spawntask { - let io: *mut IoFactoryObject = unsafe { Local::unsafe_borrow() }; - let listener = unsafe { (*io).tcp_bind(addr).unwrap() }; + let io = unsafe { local_io() }; + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -1991,8 +2250,8 @@ fn test_read_and_block() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -2014,8 +2273,8 @@ fn test_read_read_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let listener = (*io).tcp_bind(addr).unwrap(); + let io = local_io(); + let listener = io.tcp_bind(addr).unwrap(); let mut acceptor = listener.listen().unwrap(); chan.take().send(()); let mut stream = acceptor.accept().unwrap(); @@ -2031,8 +2290,8 @@ fn test_read_read_read() { do spawntask { unsafe { port.take().recv(); - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut stream = (*io).tcp_connect(addr).unwrap(); + let io = local_io(); + let mut stream = io.tcp_connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; while total_bytes_read < MAX { @@ -2060,8 +2319,8 @@ fn test_udp_twice() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client = (*io).udp_bind(client_addr).unwrap(); + let io = local_io(); + let mut client = io.udp_bind(client_addr).unwrap(); port.take().recv(); assert!(client.sendto([1], server_addr).is_ok()); assert!(client.sendto([2], server_addr).is_ok()); @@ -2070,8 +2329,8 @@ fn test_udp_twice() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server = (*io).udp_bind(server_addr).unwrap(); + let io = local_io(); + let mut server = io.udp_bind(server_addr).unwrap(); chan.take().send(()); let mut buf1 = [0]; let mut buf2 = [0]; @@ -2105,9 +2364,9 @@ fn test_udp_many_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut server_out = (*io).udp_bind(server_out_addr).unwrap(); - let mut server_in = (*io).udp_bind(server_in_addr).unwrap(); + let io = local_io(); + let mut server_out = io.udp_bind(server_out_addr).unwrap(); + let mut server_in = io.udp_bind(server_in_addr).unwrap(); let (port, chan) = first.take(); chan.send(()); port.recv(); @@ -2131,9 +2390,9 @@ fn test_udp_many_read() { do spawntask { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let mut client_out = (*io).udp_bind(client_out_addr).unwrap(); - let mut client_in = (*io).udp_bind(client_in_addr).unwrap(); + let io = local_io(); + let mut client_out = io.udp_bind(client_out_addr).unwrap(); + let mut client_in = io.udp_bind(client_in_addr).unwrap(); let (port, chan) = second.take(); port.recv(); chan.send(()); @@ -2163,8 +2422,8 @@ fn test_udp_many_read() { fn test_timer_sleep_simple() { do run_in_mt_newsched_task { unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); - let timer = (*io).timer_init(); + let io = local_io(); + let timer = io.timer_init(); do timer.map_move |mut t| { t.sleep(1) }; } } @@ -2174,29 +2433,28 @@ fn file_test_uvio_full_simple_impl() { use str::StrSlice; // why does this have to be explicitly imported to work? // compiler was complaining about no trait for str that // does .as_bytes() .. - use path::Path; use rt::io::{Open, Create, ReadWrite, Read}; unsafe { - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); let write_val = "hello uvio!"; let path = "./tmp/file_test_uvio_full.txt"; { let create_fm = Create; let create_fa = ReadWrite; - let mut fd = (*io).fs_open(&Path::new(path), create_fm, create_fa).unwrap(); + let mut fd = io.fs_open(&path.to_c_str(), create_fm, create_fa).unwrap(); let write_buf = write_val.as_bytes(); fd.write(write_buf); } { let ro_fm = Open; let ro_fa = Read; - let mut fd = (*io).fs_open(&Path::new(path), ro_fm, ro_fa).unwrap(); + let mut fd = io.fs_open(&path.to_c_str(), ro_fm, ro_fa).unwrap(); let mut read_vec = [0, .. 1028]; let nread = fd.read(read_vec).unwrap(); let read_val = str::from_utf8(read_vec.slice(0, nread as uint)); assert!(read_val == write_val.to_owned()); } - (*io).fs_unlink(&Path::new(path)); + io.fs_unlink(&path.to_c_str()); } } @@ -2211,9 +2469,9 @@ fn uvio_naive_print(input: &str) { use str::StrSlice; unsafe { use libc::{STDOUT_FILENO}; - let io: *mut IoFactoryObject = Local::unsafe_borrow(); + let io = local_io(); { - let mut fd = (*io).fs_from_raw_fd(STDOUT_FILENO, false); + let mut fd = io.fs_from_raw_fd(STDOUT_FILENO, DontClose); let write_buf = input.as_bytes(); fd.write(write_buf); } diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs index 367585b0f0e..75e6a0c6ca5 100644 --- a/src/libstd/rt/uv/uvll.rs +++ b/src/libstd/rt/uv/uvll.rs @@ -131,6 +131,8 @@ pub type uv_udp_send_t = c_void; pub type uv_getaddrinfo_t = c_void; pub type uv_process_t = c_void; pub type uv_pipe_t = c_void; +pub type uv_tty_t = c_void; +pub type uv_signal_t = c_void; pub struct uv_timespec_t { tv_sec: libc::c_long, @@ -218,6 +220,8 @@ pub type uv_getaddrinfo_cb = extern "C" fn(req: *uv_getaddrinfo_t, pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t, exit_status: c_int, term_signal: c_int); +pub type uv_signal_cb = extern "C" fn(handle: *uv_signal_t, + signum: c_int); pub type sockaddr = c_void; pub type sockaddr_in = c_void; @@ -231,37 +235,37 @@ pub type socklen_t = c_int; #[cfg(target_os = "android")] #[cfg(target_os = "linux")] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: socklen_t, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: socklen_t, ai_addr: *sockaddr, - priv ai_canonname: *char, + ai_canonname: *char, ai_next: *addrinfo } #[cfg(target_os = "macos")] #[cfg(target_os = "freebsd")] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: socklen_t, - priv ai_canonname: *char, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: socklen_t, + ai_canonname: *char, ai_addr: *sockaddr, ai_next: *addrinfo } #[cfg(windows)] pub struct addrinfo { - priv ai_flags: c_int, - priv ai_family: c_int, - priv ai_socktype: c_int, - priv ai_protocol: c_int, - priv ai_addrlen: size_t, - priv ai_canonname: *char, + ai_flags: c_int, + ai_family: c_int, + ai_socktype: c_int, + ai_protocol: c_int, + ai_addrlen: size_t, + ai_canonname: *char, ai_addr: *sockaddr, ai_next: *addrinfo } @@ -419,18 +423,6 @@ pub unsafe fn walk(loop_handle: *c_void, cb: uv_walk_cb, arg: *c_void) { rust_uv_walk(loop_handle, cb, arg); } -pub unsafe fn idle_new() -> *uv_idle_t { - #[fixed_stack_segment]; #[inline(never)]; - - rust_uv_idle_new() -} - -pub unsafe fn idle_delete(handle: *uv_idle_t) { - #[fixed_stack_segment]; #[inline(never)]; - - rust_uv_idle_delete(handle) -} - pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int { #[fixed_stack_segment]; #[inline(never)]; @@ -958,6 +950,52 @@ pub unsafe fn freeaddrinfo(ai: *addrinfo) { #[fixed_stack_segment]; #[inline(never)]; rust_uv_freeaddrinfo(ai); } +pub unsafe fn pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_open(pipe, file) +} +pub unsafe fn pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_bind(pipe, name) +} +pub unsafe fn pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t, + name: *c_char, cb: uv_connect_cb) { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_pipe_connect(req, handle, name, cb) +} +pub unsafe fn tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int, + readable: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_init(loop_ptr, tty, fd, readable) +} +pub unsafe fn tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_set_mode(tty, mode) +} +pub unsafe fn tty_get_winsize(tty: *uv_tty_t, width: *c_int, + height: *c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_tty_get_winsize(tty, width, height) +} +pub unsafe fn guess_handle(fd: c_int) -> uv_handle_type { + #[fixed_stack_segment]; #[inline(never)]; + rust_uv_guess_handle(fd) +} + +pub unsafe fn signal_init(loop_: *uv_loop_t, handle: *uv_signal_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_init(loop_, handle); +} +pub unsafe fn signal_start(handle: *uv_signal_t, + signal_cb: uv_signal_cb, + signum: c_int) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_start(handle, signal_cb, signum); +} +pub unsafe fn signal_stop(handle: *uv_signal_t) -> c_int { + #[fixed_stack_segment]; #[inline(never)]; + return rust_uv_signal_stop(handle); +} pub struct uv_err_data { priv err_name: ~str, @@ -978,8 +1016,6 @@ extern { fn rust_uv_close(handle: *c_void, cb: uv_close_cb); fn rust_uv_walk(loop_handle: *c_void, cb: uv_walk_cb, arg: *c_void); - fn rust_uv_idle_new() -> *uv_idle_t; - fn rust_uv_idle_delete(handle: *uv_idle_t); fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int; fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int; fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int; @@ -1102,4 +1138,36 @@ extern { fn rust_set_stdio_container_stream(c: *uv_stdio_container_t, stream: *uv_stream_t); fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int; + + fn rust_uv_pipe_open(pipe: *uv_pipe_t, file: c_int) -> c_int; + fn rust_uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int; + fn rust_uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t, + name: *c_char, cb: uv_connect_cb); + fn rust_uv_tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int, + readable: c_int) -> c_int; + fn rust_uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int; + fn rust_uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int, + height: *c_int) -> c_int; + fn rust_uv_guess_handle(fd: c_int) -> uv_handle_type; + + // XXX: see comments in addrinfo.rs + // These should all really be constants... + //#[rust_stack] pub fn rust_SOCK_STREAM() -> c_int; + //#[rust_stack] pub fn rust_SOCK_DGRAM() -> c_int; + //#[rust_stack] pub fn rust_SOCK_RAW() -> c_int; + //#[rust_stack] pub fn rust_IPPROTO_UDP() -> c_int; + //#[rust_stack] pub fn rust_IPPROTO_TCP() -> c_int; + //#[rust_stack] pub fn rust_AI_ADDRCONFIG() -> c_int; + //#[rust_stack] pub fn rust_AI_ALL() -> c_int; + //#[rust_stack] pub fn rust_AI_CANONNAME() -> c_int; + //#[rust_stack] pub fn rust_AI_NUMERICHOST() -> c_int; + //#[rust_stack] pub fn rust_AI_NUMERICSERV() -> c_int; + //#[rust_stack] pub fn rust_AI_PASSIVE() -> c_int; + //#[rust_stack] pub fn rust_AI_V4MAPPED() -> c_int; + + fn rust_uv_signal_init(loop_: *uv_loop_t, handle: *uv_signal_t) -> c_int; + fn rust_uv_signal_start(handle: *uv_signal_t, + signal_cb: uv_signal_cb, + signum: c_int) -> c_int; + fn rust_uv_signal_stop(handle: *uv_signal_t) -> c_int; } diff --git a/src/libstd/run.rs b/src/libstd/run.rs index c4cb8be2061..615ba60e066 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -17,8 +17,9 @@ use comm::{stream, SharedChan}; use libc::{pid_t, c_int}; use libc; use prelude::*; -use rt::io::native::process; +use rt::io::process; use rt::io; +use rt::io::extensions::ReaderUtil; use task; /** @@ -121,8 +122,24 @@ impl Process { */ pub fn new(prog: &str, args: &[~str], options: ProcessOptions) -> Process { let ProcessOptions { env, dir, in_fd, out_fd, err_fd } = options; - let inner = process::Process::new(prog, args, env, dir, - in_fd, out_fd, err_fd); + let env = env.as_ref().map(|a| a.as_slice()); + let cwd = dir.as_ref().map(|a| a.as_str().unwrap()); + fn rtify(fd: Option<c_int>, input: bool) -> process::StdioContainer { + match fd { + Some(fd) => process::InheritFd(fd), + None => process::CreatePipe(input, !input), + } + } + let rtio = [rtify(in_fd, true), rtify(out_fd, false), + rtify(err_fd, false)]; + let rtconfig = process::ProcessConfig { + program: prog, + args: args, + env: env, + cwd: cwd, + io: rtio, + }; + let inner = process::Process::new(rtconfig).unwrap(); Process { inner: inner } } @@ -135,7 +152,9 @@ impl Process { * Fails if there is no stdin available (it's already been removed by * take_input) */ - pub fn input<'a>(&'a mut self) -> &'a mut io::Writer { self.inner.input() } + pub fn input<'a>(&'a mut self) -> &'a mut io::Writer { + self.inner.io[0].get_mut_ref() as &mut io::Writer + } /** * Returns an io::Reader that can be used to read from this Process's stdout. @@ -143,7 +162,9 @@ impl Process { * Fails if there is no stdout available (it's already been removed by * take_output) */ - pub fn output<'a>(&'a mut self) -> &'a mut io::Reader { self.inner.output() } + pub fn output<'a>(&'a mut self) -> &'a mut io::Reader { + self.inner.io[1].get_mut_ref() as &mut io::Reader + } /** * Returns an io::Reader that can be used to read from this Process's stderr. @@ -151,18 +172,20 @@ impl Process { * Fails if there is no stderr available (it's already been removed by * take_error) */ - pub fn error<'a>(&'a mut self) -> &'a mut io::Reader { self.inner.error() } + pub fn error<'a>(&'a mut self) -> &'a mut io::Reader { + self.inner.io[2].get_mut_ref() as &mut io::Reader + } /** * Closes the handle to the child process's stdin. */ pub fn close_input(&mut self) { - self.inner.take_input(); + self.inner.io[0].take(); } fn close_outputs(&mut self) { - self.inner.take_output(); - self.inner.take_error(); + self.inner.io[1].take(); + self.inner.io[2].take(); } /** @@ -185,21 +208,9 @@ impl Process { * were redirected to existing file descriptors. */ pub fn finish_with_output(&mut self) -> ProcessOutput { - self.inner.take_input(); // close stdin - let output = Cell::new(self.inner.take_output()); - let error = Cell::new(self.inner.take_error()); - - fn read_everything(r: &mut io::Reader) -> ~[u8] { - let mut ret = ~[]; - let mut buf = [0, ..1024]; - loop { - match r.read(buf) { - Some(n) => { ret.push_all(buf.slice_to(n)); } - None => break - } - } - return ret; - } + self.close_input(); + let output = Cell::new(self.inner.io[1].take()); + let error = Cell::new(self.inner.io[2].take()); // Spawn two entire schedulers to read both stdout and sterr // in parallel so we don't deadlock while blocking on one @@ -208,16 +219,27 @@ impl Process { let (p, ch) = stream(); let ch = SharedChan::new(ch); let ch_clone = ch.clone(); - do task::spawn_sched(task::SingleThreaded) { - match error.take() { - Some(ref mut e) => ch.send((2, read_everything(*e))), - None => ch.send((2, ~[])) + + // FIXME(#910, #8674): right now I/O is incredibly brittle when it comes + // to linked failure, so these tasks must be spawn so they're not + // affected by linked failure. If these are removed, then the + // runtime may never exit because linked failure will cause some + // SchedHandle structures to not get destroyed, meaning that + // there's always an async watcher available. + do task::spawn_unlinked { + do io::ignore_io_error { + match error.take() { + Some(ref mut e) => ch.send((2, e.read_to_end())), + None => ch.send((2, ~[])) + } } } - do task::spawn_sched(task::SingleThreaded) { - match output.take() { - Some(ref mut e) => ch_clone.send((1, read_everything(*e))), - None => ch_clone.send((1, ~[])) + do task::spawn_unlinked { + do io::ignore_io_error { + match output.take() { + Some(ref mut e) => ch_clone.send((1, e.read_to_end())), + None => ch_clone.send((1, ~[])) + } } } @@ -311,6 +333,7 @@ mod tests { use path::Path; use run; use str; + use task::spawn; use unstable::running_on_valgrind; use rt::io::native::file; use rt::io::{Writer, Reader}; @@ -383,6 +406,7 @@ mod tests { } #[test] + #[ignore] // FIXME(#10016) cat never sees stdin close fn test_pipes() { let pipe_in = os::pipe(); @@ -401,13 +425,14 @@ mod tests { os::close(pipe_out.out); os::close(pipe_err.out); - let expected = ~"test"; - writeclose(pipe_in.out, expected); + do spawn { + writeclose(pipe_in.out, ~"test"); + } let actual = readclose(pipe_out.input); readclose(pipe_err.input); proc.finish(); - assert_eq!(expected, actual); + assert_eq!(~"test", actual); } fn writeclose(fd: c_int, s: &str) { diff --git a/src/libstd/std.rs b/src/libstd/std.rs index 8c72e083f88..12316cb5ead 100644 --- a/src/libstd/std.rs +++ b/src/libstd/std.rs @@ -148,7 +148,6 @@ pub mod iter; pub mod to_str; pub mod to_bytes; pub mod clone; -pub mod io; pub mod hash; pub mod container; pub mod default; diff --git a/src/libstd/str.rs b/src/libstd/str.rs index f134788942c..053076c5d89 100644 --- a/src/libstd/str.rs +++ b/src/libstd/str.rs @@ -1018,7 +1018,6 @@ static TAG_CONT_U8: u8 = 128u8; /// Unsafe operations pub mod raw { - use option::{Option, Some}; use cast; use libc; use ptr; @@ -1172,34 +1171,6 @@ pub mod raw { vec::raw::set_len(as_owned_vec(s), new_len) } - /// Parses a C "multistring", eg windows env values or - /// the req->ptr result in a uv_fs_readdir() call. - /// Optionally, a `count` can be passed in, limiting the - /// parsing to only being done `count`-times. - #[inline] - pub unsafe fn from_c_multistring(buf: *libc::c_char, count: Option<uint>) -> ~[~str] { - #[fixed_stack_segment]; #[inline(never)]; - - let mut curr_ptr: uint = buf as uint; - let mut result = ~[]; - let mut ctr = 0; - let (limited_count, limit) = match count { - Some(limit) => (true, limit), - None => (false, 0) - }; - while(((limited_count && ctr < limit) || !limited_count) - && *(curr_ptr as *libc::c_char) != 0 as libc::c_char) { - let env_pair = from_c_str( - curr_ptr as *libc::c_char); - result.push(env_pair); - curr_ptr += - libc::strlen(curr_ptr as *libc::c_char) as uint - + 1; - ctr += 1; - } - result - } - /// Sets the length of a string /// /// This will explicitly set the size of the string, without actually @@ -1214,26 +1185,6 @@ pub mod raw { assert_eq!(c, ~"AAA"); } } - - #[test] - fn test_str_multistring_parsing() { - use option::None; - unsafe { - let input = bytes!("zero", "\x00", "one", "\x00", "\x00"); - let ptr = vec::raw::to_ptr(input); - let result = from_c_multistring(ptr as *libc::c_char, None); - assert!(result.len() == 2); - let mut ctr = 0; - for x in result.iter() { - match ctr { - 0 => assert_eq!(x, &~"zero"), - 1 => assert_eq!(x, &~"one"), - _ => fail!("shouldn't happen!") - } - ctr += 1; - } - } - } } /* diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index dec13eded39..fbe2988f77c 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -89,7 +89,7 @@ use unstable::sync::Exclusive; use rt::in_green_task_context; use rt::local::Local; use rt::task::{Task, Sched}; -use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread}; +use rt::shouldnt_be_public::{Scheduler, KillHandle, WorkQueue, Thread, EventLoop}; use rt::uv::uvio::UvEventLoop; #[cfg(test)] use task::default_task_opts; @@ -607,7 +607,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: ~fn()) { let work_queue = WorkQueue::new(); // Create a new scheduler to hold the new task - let new_loop = ~UvEventLoop::new(); + let new_loop = ~UvEventLoop::new() as ~EventLoop; let mut new_sched = ~Scheduler::new_special(new_loop, work_queue, (*sched).work_queues.clone(), diff --git a/src/libstd/to_bytes.rs b/src/libstd/to_bytes.rs index 4d5da19dafd..8c78e34528b 100644 --- a/src/libstd/to_bytes.rs +++ b/src/libstd/to_bytes.rs @@ -16,8 +16,6 @@ The `ToBytes` and `IterBytes` traits use cast; use container::Container; -use io; -use io::Writer; use iter::Iterator; use option::{None, Option, Some}; use str::{Str, StrSlice}; @@ -360,7 +358,10 @@ pub trait ToBytes { impl<A:IterBytes> ToBytes for A { fn to_bytes(&self, lsb0: bool) -> ~[u8] { - do io::with_bytes_writer |wr| { + use rt::io::mem; + use rt::io::Writer; + + do mem::with_mem_writer |wr| { do self.iter_bytes(lsb0) |bytes| { wr.write(bytes); true |
