diff options
| -rw-r--r-- | src/libnative/io/file.rs (renamed from src/libstd/io/native/file.rs) | 57 | ||||
| -rw-r--r-- | src/libnative/io/mod.rs (renamed from src/libstd/io/native/mod.rs) | 39 | ||||
| -rw-r--r-- | src/libnative/io/process.rs (renamed from src/libstd/io/native/process.rs) | 52 | ||||
| -rw-r--r-- | src/libnative/lib.rs | 61 | ||||
| -rw-r--r-- | src/libnative/task.rs | 257 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 5 |
6 files changed, 386 insertions, 85 deletions
diff --git a/src/libstd/io/native/file.rs b/src/libnative/io/file.rs index de2655303d6..eaa4403f7bf 100644 --- a/src/libstd/io/native/file.rs +++ b/src/libnative/io/file.rs @@ -10,28 +10,21 @@ //! Blocking posix-based file I/O -#[allow(non_camel_case_types)]; - -use c_str::CString; -use io::IoError; -use io; -use libc::c_int; -use libc; -use ops::Drop; -use option::{Some, None, Option}; -use os; -use path::{Path, GenericPath}; -use ptr::RawPtr; -use result::{Result, Ok, Err}; -use rt::rtio; +use std::c_str::CString; +use std::io::IoError; +use std::io; +use std::libc::c_int; +use std::libc; +use std::os; +use std::rt::rtio; +use std::unstable::intrinsics; +use std::vec; + use super::IoResult; -use unstable::intrinsics; -use vec::ImmutableVector; -use vec; -#[cfg(windows)] use os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; -#[cfg(windows)] use ptr; -#[cfg(windows)] use str; +#[cfg(windows)] use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode}; +#[cfg(windows)] use std::ptr; +#[cfg(windows)] use std::str; fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 { #[cfg(windows)] static eintr: int = 0; // doesn't matter @@ -490,8 +483,8 @@ pub fn readdir(p: &CString) -> IoResult<~[Path]> { unsafe { #[cfg(not(windows))] unsafe fn get_list(p: &CString) -> IoResult<~[Path]> { - use libc::{dirent_t}; - use libc::{opendir, readdir, closedir}; + use std::libc::{dirent_t}; + use std::libc::{opendir, readdir, closedir}; extern { fn rust_list_dir_val(ptr: *dirent_t) -> *libc::c_char; } @@ -517,14 +510,14 @@ pub fn readdir(p: &CString) -> IoResult<~[Path]> { #[cfg(windows)] unsafe fn get_list(p: &CString) -> IoResult<~[Path]> { - use libc::consts::os::extra::INVALID_HANDLE_VALUE; - use libc::{wcslen, free}; - use libc::funcs::extra::kernel32::{ + use std::libc::consts::os::extra::INVALID_HANDLE_VALUE; + use std::libc::{wcslen, free}; + use std::libc::funcs::extra::kernel32::{ FindFirstFileW, FindNextFileW, FindClose, }; - use libc::types::os::arch::extra::HANDLE; + use std::libc::types::os::arch::extra::HANDLE; use os::win32::{ as_utf16_p }; @@ -906,12 +899,12 @@ pub fn utime(p: &CString, atime: u64, mtime: u64) -> IoResult<()> { #[cfg(test)] mod tests { - use io::native::file::{CFile, FileDesc}; - use io; - use libc; - use os; - use result::Ok; - use rt::rtio::RtioFileStream; + use std::io::native::file::{CFile, FileDesc}; + use std::io::fs; + use std::io; + use std::libc; + use std::os; + use std::rt::rtio::RtioFileStream; #[ignore(cfg(target_os = "freebsd"))] // hmm, maybe pipes have a tiny buffer #[test] diff --git a/src/libstd/io/native/mod.rs b/src/libnative/io/mod.rs index d9dccc84f1c..36e3f8af190 100644 --- a/src/libstd/io/native/mod.rs +++ b/src/libnative/io/mod.rs @@ -21,24 +21,21 @@ //! play. The only dependencies of these modules are the normal system libraries //! that you would find on the respective platform. -use c_str::CString; -use comm::SharedChan; -use libc::c_int; -use libc; -use option::{Option, None, Some}; -use os; -use path::Path; -use result::{Result, Ok, Err}; -use rt::rtio; -use rt::rtio::{RtioTcpStream, RtioTcpListener, RtioUdpSocket, RtioUnixListener, - RtioPipe, RtioFileStream, RtioProcess, RtioSignal, RtioTTY, - CloseBehavior, RtioTimer}; -use io; -use io::IoError; -use io::net::ip::SocketAddr; -use io::process::ProcessConfig; -use io::signal::Signum; -use ai = io::net::addrinfo; +use std::c_str::CString; +use std::comm::SharedChan; +use std::libc::c_int; +use std::libc; +use std::os; +use std::rt::rtio; +use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioUdpSocket, + RtioUnixListener, RtioPipe, RtioFileStream, RtioProcess, + RtioSignal, RtioTTY, CloseBehavior, RtioTimer}; +use std::io; +use std::io::IoError; +use std::io::net::ip::SocketAddr; +use std::io::process::ProcessConfig; +use std::io::signal::Signum; +use ai = std::io::net::addrinfo; // Local re-exports pub use self::file::FileDesc; @@ -114,6 +111,9 @@ fn mkerr_winbool(ret: libc::c_int) -> IoResult<()> { pub struct IoFactory; impl rtio::IoFactory for IoFactory { + // all native io factories are the same + fn id(&self) -> uint { 0 } + // networking fn tcp_connect(&mut self, _addr: SocketAddr) -> IoResult<~RtioTcpStream> { Err(unimpl()) @@ -223,6 +223,3 @@ impl rtio::IoFactory for IoFactory { Err(unimpl()) } } - -pub static mut NATIVE_IO_FACTORY: IoFactory = IoFactory; - diff --git a/src/libstd/io/native/process.rs b/src/libnative/io/process.rs index ef972dc4d0a..2277d408ee4 100644 --- a/src/libstd/io/native/process.rs +++ b/src/libnative/io/process.rs @@ -8,18 +8,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io; -use libc::{pid_t, c_void, c_int}; -use libc; -use os; -use prelude::*; -use ptr; -use rt::rtio; -use super::file; -#[cfg(windows)] -use cast; +use std::cast; +use std::io; +use std::libc::{pid_t, c_void, c_int}; +use std::libc; +use std::os; +use std::ptr; +use std::rt::rtio; +use p = std::io::process; -use p = io::process; +use super::file; /** * A value representing a child process. @@ -179,22 +177,22 @@ fn spawn_process_os(prog: &str, args: &[~str], env: Option<~[(~str, ~str)]>, dir: Option<&Path>, in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { - use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO}; - use libc::consts::os::extra::{ + use std::libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO}; + use std::libc::consts::os::extra::{ TRUE, FALSE, STARTF_USESTDHANDLES, INVALID_HANDLE_VALUE, DUPLICATE_SAME_ACCESS }; - use libc::funcs::extra::kernel32::{ + use std::libc::funcs::extra::kernel32::{ GetCurrentProcess, DuplicateHandle, CloseHandle, CreateProcessA }; - use libc::funcs::extra::msvcrt::get_osfhandle; + use std::libc::funcs::extra::msvcrt::get_osfhandle; - use mem; + use std::mem; unsafe { @@ -256,10 +254,10 @@ fn spawn_process_os(prog: &str, args: &[~str], fail!("failure in CreateProcess: {}", *msg); } - // We close the thread handle because we don't care about keeping the + // We close the thread handle because std::we don't care about keeping the // thread id valid, and we aren't keeping the thread handle around to be // able to close it later. We don't close the process handle however - // because we want the process id to stay valid at least until the + // because std::we want the process id to stay valid at least until the // calling code closes the process handle. CloseHandle(pi.hThread); @@ -362,8 +360,8 @@ fn spawn_process_os(prog: &str, args: &[~str], env: Option<~[(~str, ~str)]>, dir: Option<&Path>, in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult { - use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; - use libc::funcs::bsd44::getdtablesize; + use std::libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp}; + use std::libc::funcs::bsd44::getdtablesize; mod rustrt { extern { @@ -433,7 +431,7 @@ fn spawn_process_os(prog: &str, args: &[~str], #[cfg(unix)] fn with_argv<T>(prog: &str, args: &[~str], cb: |**libc::c_char| -> T) -> T { - use vec; + use std::vec; // We can't directly convert `str`s into `*char`s, as someone needs to hold // a reference to the intermediary byte buffers. So first build an array to @@ -459,7 +457,7 @@ fn with_argv<T>(prog: &str, args: &[~str], cb: |**libc::c_char| -> T) -> T { #[cfg(unix)] fn with_envp<T>(env: Option<~[(~str, ~str)]>, cb: |*c_void| -> T) -> T { - use vec; + use std::vec; // On posixy systems we can pass a char** for envp, which is a // null-terminated array of "k=v\n" strings. Like `with_argv`, we have to @@ -540,8 +538,8 @@ fn waitpid(pid: pid_t) -> int { #[cfg(windows)] fn waitpid_os(pid: pid_t) -> int { - use libc::types::os::arch::extra::DWORD; - use libc::consts::os::extra::{ + use std::libc::types::os::arch::extra::DWORD; + use std::libc::consts::os::extra::{ SYNCHRONIZE, PROCESS_QUERY_INFORMATION, FALSE, @@ -549,7 +547,7 @@ fn waitpid(pid: pid_t) -> int { INFINITE, WAIT_FAILED }; - use libc::funcs::extra::kernel32::{ + use std::libc::funcs::extra::kernel32::{ OpenProcess, GetExitCodeProcess, CloseHandle, @@ -585,7 +583,7 @@ fn waitpid(pid: pid_t) -> int { #[cfg(unix)] fn waitpid_os(pid: pid_t) -> int { - use libc::funcs::posix01::wait::*; + use std::libc::funcs::posix01::wait; #[cfg(target_os = "linux")] #[cfg(target_os = "android")] @@ -612,7 +610,7 @@ fn waitpid(pid: pid_t) -> int { } let mut status = 0 as c_int; - if unsafe { waitpid(pid, &mut status, 0) } == -1 { + if unsafe { wait::waitpid(pid, &mut status, 0) } == -1 { fail!("failure in waitpid: {}", os::last_os_error()); } diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs new file mode 100644 index 00000000000..4b32511dc42 --- /dev/null +++ b/src/libnative/lib.rs @@ -0,0 +1,61 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! The native runtime crate +//! +//! This crate contains an implementation of 1:1 scheduling for a "native" +//! runtime. In addition, all I/O provided by this crate is the thread blocking +//! version of I/O. + +#[link(name = "native", + package_id = "native", + vers = "0.9-pre", + uuid = "535344a7-890f-5a23-e1f3-e0d118805141", + url = "https://github.com/mozilla/rust/tree/master/src/native")]; + +#[license = "MIT/ASL2"]; +#[crate_type = "rlib"]; +#[crate_type = "dylib"]; + +// NB this crate explicitly does *not* allow glob imports, please seriously +// consider whether they're needed before adding that feature here. + +use std::cast; +use std::os; +use std::rt; +use std::task::try; + +pub mod io; +pub mod task; + +// XXX: this should not exist here +#[cfg(stage0)] +#[lang = "start"] +pub fn start(main: *u8, argc: int, argv: **u8) -> int { + rt::init(argc, argv); + + // Bootstrap ourselves by installing a local Task and then immediately + // spawning a thread to run 'main'. Always spawn a new thread for main so + // the stack size of 'main' is known (and the bounds can be set + // appropriately). + // + // Once the main task has completed, then we wait for everyone else to exit. + task::run(task::new(), proc() { + let main: extern "Rust" fn() = unsafe { cast::transmute(main) }; + match do try { main() } { + Ok(()) => { os::set_exit_status(0); } + Err(..) => { os::set_exit_status(rt::DEFAULT_ERROR_CODE); } + } + }); + task::wait_for_completion(); + + unsafe { rt::cleanup(); } + os::get_exit_status() +} diff --git a/src/libnative/task.rs b/src/libnative/task.rs new file mode 100644 index 00000000000..fa7500ca85e --- /dev/null +++ b/src/libnative/task.rs @@ -0,0 +1,257 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Tasks implemented on top of OS threads +//! +//! This module contains the implementation of the 1:1 threading module required +//! by rust tasks. This implements the necessary API traits laid out by std::rt +//! in order to spawn new tasks and deschedule the current task. + +use std::cast; +use std::rt::env; +use std::rt::local::Local; +use std::rt::rtio; +use std::rt::task::{Task, BlockedTask}; +use std::rt::thread::Thread; +use std::rt; +use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +use std::task::TaskOpts; +use std::unstable::mutex::{Mutex, MUTEX_INIT}; +use std::unstable::stack; + +use io; +use task; + +static mut THREAD_CNT: AtomicUint = INIT_ATOMIC_UINT; +static mut LOCK: Mutex = MUTEX_INIT; + +/// Waits for all spawned threads to finish completion. This should only be used +/// by the main task in order to wait for all other tasks to terminate. +/// +/// This mirrors the same semantics as the green scheduling model. +pub fn wait_for_completion() { + static mut someone_waited: bool = false; + + unsafe { + LOCK.lock(); + assert!(!someone_waited); + someone_waited = true; + while THREAD_CNT.load(SeqCst) > 0 { + LOCK.wait(); + } + LOCK.unlock(); + LOCK.destroy(); + } + +} + +// Signal that a thread has finished execution, possibly waking up a blocker +// waiting for all threads to have finished. +fn signal_done() { + unsafe { + LOCK.lock(); + if THREAD_CNT.fetch_sub(1, SeqCst) == 1 { + LOCK.signal(); + } + LOCK.unlock(); + } +} + +/// Creates a new Task which is ready to execute as a 1:1 task. +pub fn new() -> ~Task { + let mut task = ~Task::new(); + task.put_runtime(~Ops { + lock: unsafe { Mutex::new() }, + } as ~rt::Runtime); + return task; +} + +/// Spawns a new task given the configuration options and a procedure to run +/// inside the task. +pub fn spawn(opts: TaskOpts, f: proc()) { + // must happen before the spawn, no need to synchronize with a lock. + unsafe { THREAD_CNT.fetch_add(1, SeqCst); } + + let TaskOpts { + watched: _watched, + notify_chan, name, stack_size + } = opts; + + let mut task = new(); + task.name = name; + match notify_chan { + Some(chan) => { + let on_exit = proc(task_result) { chan.send(task_result) }; + task.death.on_exit = Some(on_exit); + } + None => {} + } + + let stack = stack_size.unwrap_or(env::min_stack()); + let task = task; + + // Spawning a new OS thread guarantees that __morestack will never get + // triggered, but we must manually set up the actual stack bounds once this + // function starts executing. This raises the lower limit by a bit because + // by the time that this function is executing we've already consumed at + // least a little bit of stack (we don't know the exact byte address at + // which our stack started). + Thread::spawn_stack(stack, proc() { + let something_around_the_top_of_the_stack = 1; + let addr = &something_around_the_top_of_the_stack as *int; + unsafe { + let my_stack = addr as uint; + stack::record_stack_bounds(my_stack - stack + 1024, my_stack); + } + + run(task, f); + signal_done(); + }) +} + +/// Runs a task once, consuming the task. The given procedure is run inside of +/// the task. +pub fn run(t: ~Task, f: proc()) { + let mut f = Some(f); + t.run(|| { f.take_unwrap()(); }); +} + +// This structure is the glue between channels and the 1:1 scheduling mode. This +// structure is allocated once per task. +struct Ops { + lock: Mutex, // native synchronization +} + +impl rt::Runtime for Ops { + fn yield_now(~self, mut cur_task: ~Task) { + // put the task back in TLS and then invoke the OS thread yield + cur_task.put_runtime(self as ~rt::Runtime); + Local::put(cur_task); + Thread::yield_now(); + } + + fn maybe_yield(~self, mut cur_task: ~Task) { + // just put the task back in TLS, on OS threads we never need to + // opportunistically yield b/c the OS will do that for us (preemption) + cur_task.put_runtime(self as ~rt::Runtime); + Local::put(cur_task); + } + + fn wrap(~self) -> ~Any { + self as ~Any + } + + // This function gets a little interesting. There are a few safety and + // ownership violations going on here, but this is all done in the name of + // shared state. Additionally, all of the violations are protected with a + // mutex, so in theory there are no races. + // + // The first thing we need to do is to get a pointer to the task's internal + // mutex. This address will not be changing (because the task is allocated + // on the heap). We must have this handle separately because the task will + // have its ownership transferred to the given closure. We're guaranteed, + // however, that this memory will remain valid because *this* is the current + // task's execution thread. + // + // The next weird part is where ownership of the task actually goes. We + // relinquish it to the `f` blocking function, but upon returning this + // function needs to replace the task back in TLS. There is no communication + // from the wakeup thread back to this thread about the task pointer, and + // there's really no need to. In order to get around this, we cast the task + // to a `uint` which is then used at the end of this function to cast back + // to a `~Task` object. Naturally, this looks like it violates ownership + // semantics in that there may be two `~Task` objects. + // + // The fun part is that the wakeup half of this implementation knows to + // "forget" the task on the other end. This means that the awakening half of + // things silently relinquishes ownership back to this thread, but not in a + // way that the compiler can understand. The task's memory is always valid + // for both tasks because these operations are all done inside of a mutex. + // + // You'll also find that if blocking fails (the `f` function hands the + // BlockedTask back to us), we will `cast::forget` the handles. The + // reasoning for this is the same logic as above in that the task silently + // transfers ownership via the `uint`, not through normal compiler + // semantics. + fn deschedule(mut ~self, times: uint, mut cur_task: ~Task, + f: |BlockedTask| -> Result<(), BlockedTask>) { + let my_lock: *mut Mutex = &mut self.lock as *mut Mutex; + cur_task.put_runtime(self as ~rt::Runtime); + + unsafe { + let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task); + let task = BlockedTask::block(cur_task); + + if times == 1 { + (*my_lock).lock(); + match f(task) { + Ok(()) => (*my_lock).wait(), + Err(task) => { cast::forget(task.wake()); } + } + (*my_lock).unlock(); + } else { + let mut iter = task.make_selectable(times); + (*my_lock).lock(); + let success = iter.all(|task| { + match f(task) { + Ok(()) => true, + Err(task) => { + cast::forget(task.wake()); + false + } + } + }); + if success { + (*my_lock).wait(); + } + (*my_lock).unlock(); + } + // re-acquire ownership of the task + cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe); + } + + // put the task back in TLS, and everything is as it once was. + Local::put(cur_task); + } + + // See the comments on `deschedule` for why the task is forgotten here, and + // why it's valid to do so. + fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) { + unsafe { + let lock: *mut Mutex = &mut self.lock as *mut Mutex; + to_wake.put_runtime(self as ~rt::Runtime); + cast::forget(to_wake); + (*lock).lock(); + (*lock).signal(); + (*lock).unlock(); + } + } + + fn spawn_sibling(~self, mut cur_task: ~Task, opts: TaskOpts, f: proc()) { + cur_task.put_runtime(self as ~rt::Runtime); + Local::put(cur_task); + + task::spawn(opts, f); + } + + fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { + static mut io: io::IoFactory = io::IoFactory; + // Unsafety is from accessing `io`, which is guaranteed to be safe + // because you can't do anything usable with this statically initialized + // unit struct. + Some(unsafe { rtio::LocalIo::new(&mut io as &mut rtio::IoFactory) }) + } +} + +impl Drop for Ops { + fn drop(&mut self) { + unsafe { self.lock.destroy() } + } +} diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index bd0b9e08b7c..0852c4cadb6 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -164,9 +164,6 @@ requests are implemented by descheduling the running task and performing an asynchronous request; the task is only resumed once the asynchronous request completes. -For blocking (but possibly more efficient) implementations, look -in the `io::native` module. - # Error Handling I/O is an area where nearly every operation can result in unexpected @@ -349,8 +346,6 @@ pub mod timer; /// Buffered I/O wrappers pub mod buffered; -pub mod native; - /// Signal handling pub mod signal; |
