about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libnative/io/file.rs (renamed from src/libstd/io/native/file.rs)57
-rw-r--r--src/libnative/io/mod.rs (renamed from src/libstd/io/native/mod.rs)39
-rw-r--r--src/libnative/io/process.rs (renamed from src/libstd/io/native/process.rs)52
-rw-r--r--src/libnative/lib.rs61
-rw-r--r--src/libnative/task.rs257
-rw-r--r--src/libstd/io/mod.rs5
6 files changed, 386 insertions, 85 deletions
diff --git a/src/libstd/io/native/file.rs b/src/libnative/io/file.rs
index de2655303d6..eaa4403f7bf 100644
--- a/src/libstd/io/native/file.rs
+++ b/src/libnative/io/file.rs
@@ -10,28 +10,21 @@
 
 //! Blocking posix-based file I/O
 
-#[allow(non_camel_case_types)];
-
-use c_str::CString;
-use io::IoError;
-use io;
-use libc::c_int;
-use libc;
-use ops::Drop;
-use option::{Some, None, Option};
-use os;
-use path::{Path, GenericPath};
-use ptr::RawPtr;
-use result::{Result, Ok, Err};
-use rt::rtio;
+use std::c_str::CString;
+use std::io::IoError;
+use std::io;
+use std::libc::c_int;
+use std::libc;
+use std::os;
+use std::rt::rtio;
+use std::unstable::intrinsics;
+use std::vec;
+
 use super::IoResult;
-use unstable::intrinsics;
-use vec::ImmutableVector;
-use vec;
 
-#[cfg(windows)] use os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
-#[cfg(windows)] use ptr;
-#[cfg(windows)] use str;
+#[cfg(windows)] use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
+#[cfg(windows)] use std::ptr;
+#[cfg(windows)] use std::str;
 
 fn keep_going(data: &[u8], f: |*u8, uint| -> i64) -> i64 {
     #[cfg(windows)] static eintr: int = 0; // doesn't matter
@@ -490,8 +483,8 @@ pub fn readdir(p: &CString) -> IoResult<~[Path]> {
     unsafe {
         #[cfg(not(windows))]
         unsafe fn get_list(p: &CString) -> IoResult<~[Path]> {
-            use libc::{dirent_t};
-            use libc::{opendir, readdir, closedir};
+            use std::libc::{dirent_t};
+            use std::libc::{opendir, readdir, closedir};
             extern {
                 fn rust_list_dir_val(ptr: *dirent_t) -> *libc::c_char;
             }
@@ -517,14 +510,14 @@ pub fn readdir(p: &CString) -> IoResult<~[Path]> {
 
         #[cfg(windows)]
         unsafe fn get_list(p: &CString) -> IoResult<~[Path]> {
-            use libc::consts::os::extra::INVALID_HANDLE_VALUE;
-            use libc::{wcslen, free};
-            use libc::funcs::extra::kernel32::{
+            use std::libc::consts::os::extra::INVALID_HANDLE_VALUE;
+            use std::libc::{wcslen, free};
+            use std::libc::funcs::extra::kernel32::{
                 FindFirstFileW,
                 FindNextFileW,
                 FindClose,
             };
-            use libc::types::os::arch::extra::HANDLE;
+            use std::libc::types::os::arch::extra::HANDLE;
             use os::win32::{
                 as_utf16_p
             };
@@ -906,12 +899,12 @@ pub fn utime(p: &CString, atime: u64, mtime: u64) -> IoResult<()> {
 
 #[cfg(test)]
 mod tests {
-    use io::native::file::{CFile, FileDesc};
-    use io;
-    use libc;
-    use os;
-    use result::Ok;
-    use rt::rtio::RtioFileStream;
+    use std::io::native::file::{CFile, FileDesc};
+    use std::io::fs;
+    use std::io;
+    use std::libc;
+    use std::os;
+    use std::rt::rtio::RtioFileStream;
 
     #[ignore(cfg(target_os = "freebsd"))] // hmm, maybe pipes have a tiny buffer
     #[test]
diff --git a/src/libstd/io/native/mod.rs b/src/libnative/io/mod.rs
index d9dccc84f1c..36e3f8af190 100644
--- a/src/libstd/io/native/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -21,24 +21,21 @@
 //! play. The only dependencies of these modules are the normal system libraries
 //! that you would find on the respective platform.
 
-use c_str::CString;
-use comm::SharedChan;
-use libc::c_int;
-use libc;
-use option::{Option, None, Some};
-use os;
-use path::Path;
-use result::{Result, Ok, Err};
-use rt::rtio;
-use rt::rtio::{RtioTcpStream, RtioTcpListener, RtioUdpSocket, RtioUnixListener,
-               RtioPipe, RtioFileStream, RtioProcess, RtioSignal, RtioTTY,
-               CloseBehavior, RtioTimer};
-use io;
-use io::IoError;
-use io::net::ip::SocketAddr;
-use io::process::ProcessConfig;
-use io::signal::Signum;
-use ai = io::net::addrinfo;
+use std::c_str::CString;
+use std::comm::SharedChan;
+use std::libc::c_int;
+use std::libc;
+use std::os;
+use std::rt::rtio;
+use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioUdpSocket,
+                    RtioUnixListener, RtioPipe, RtioFileStream, RtioProcess,
+                    RtioSignal, RtioTTY, CloseBehavior, RtioTimer};
+use std::io;
+use std::io::IoError;
+use std::io::net::ip::SocketAddr;
+use std::io::process::ProcessConfig;
+use std::io::signal::Signum;
+use ai = std::io::net::addrinfo;
 
 // Local re-exports
 pub use self::file::FileDesc;
@@ -114,6 +111,9 @@ fn mkerr_winbool(ret: libc::c_int) -> IoResult<()> {
 pub struct IoFactory;
 
 impl rtio::IoFactory for IoFactory {
+    // all native io factories are the same
+    fn id(&self) -> uint { 0 }
+
     // networking
     fn tcp_connect(&mut self, _addr: SocketAddr) -> IoResult<~RtioTcpStream> {
         Err(unimpl())
@@ -223,6 +223,3 @@ impl rtio::IoFactory for IoFactory {
         Err(unimpl())
     }
 }
-
-pub static mut NATIVE_IO_FACTORY: IoFactory = IoFactory;
-
diff --git a/src/libstd/io/native/process.rs b/src/libnative/io/process.rs
index ef972dc4d0a..2277d408ee4 100644
--- a/src/libstd/io/native/process.rs
+++ b/src/libnative/io/process.rs
@@ -8,18 +8,16 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use io;
-use libc::{pid_t, c_void, c_int};
-use libc;
-use os;
-use prelude::*;
-use ptr;
-use rt::rtio;
-use super::file;
-#[cfg(windows)]
-use cast;
+use std::cast;
+use std::io;
+use std::libc::{pid_t, c_void, c_int};
+use std::libc;
+use std::os;
+use std::ptr;
+use std::rt::rtio;
+use p = std::io::process;
 
-use p = io::process;
+use super::file;
 
 /**
  * A value representing a child process.
@@ -179,22 +177,22 @@ fn spawn_process_os(prog: &str, args: &[~str],
                     env: Option<~[(~str, ~str)]>,
                     dir: Option<&Path>,
                     in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult {
-    use libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO};
-    use libc::consts::os::extra::{
+    use std::libc::types::os::arch::extra::{DWORD, HANDLE, STARTUPINFO};
+    use std::libc::consts::os::extra::{
         TRUE, FALSE,
         STARTF_USESTDHANDLES,
         INVALID_HANDLE_VALUE,
         DUPLICATE_SAME_ACCESS
     };
-    use libc::funcs::extra::kernel32::{
+    use std::libc::funcs::extra::kernel32::{
         GetCurrentProcess,
         DuplicateHandle,
         CloseHandle,
         CreateProcessA
     };
-    use libc::funcs::extra::msvcrt::get_osfhandle;
+    use std::libc::funcs::extra::msvcrt::get_osfhandle;
 
-    use mem;
+    use std::mem;
 
     unsafe {
 
@@ -256,10 +254,10 @@ fn spawn_process_os(prog: &str, args: &[~str],
             fail!("failure in CreateProcess: {}", *msg);
         }
 
-        // We close the thread handle because we don't care about keeping the
+        // We close the thread handle because std::we don't care about keeping the
         // thread id valid, and we aren't keeping the thread handle around to be
         // able to close it later. We don't close the process handle however
-        // because we want the process id to stay valid at least until the
+        // because std::we want the process id to stay valid at least until the
         // calling code closes the process handle.
         CloseHandle(pi.hThread);
 
@@ -362,8 +360,8 @@ fn spawn_process_os(prog: &str, args: &[~str],
                     env: Option<~[(~str, ~str)]>,
                     dir: Option<&Path>,
                     in_fd: c_int, out_fd: c_int, err_fd: c_int) -> SpawnProcessResult {
-    use libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
-    use libc::funcs::bsd44::getdtablesize;
+    use std::libc::funcs::posix88::unistd::{fork, dup2, close, chdir, execvp};
+    use std::libc::funcs::bsd44::getdtablesize;
 
     mod rustrt {
         extern {
@@ -433,7 +431,7 @@ fn spawn_process_os(prog: &str, args: &[~str],
 
 #[cfg(unix)]
 fn with_argv<T>(prog: &str, args: &[~str], cb: |**libc::c_char| -> T) -> T {
-    use vec;
+    use std::vec;
 
     // We can't directly convert `str`s into `*char`s, as someone needs to hold
     // a reference to the intermediary byte buffers. So first build an array to
@@ -459,7 +457,7 @@ fn with_argv<T>(prog: &str, args: &[~str], cb: |**libc::c_char| -> T) -> T {
 
 #[cfg(unix)]
 fn with_envp<T>(env: Option<~[(~str, ~str)]>, cb: |*c_void| -> T) -> T {
-    use vec;
+    use std::vec;
 
     // On posixy systems we can pass a char** for envp, which is a
     // null-terminated array of "k=v\n" strings. Like `with_argv`, we have to
@@ -540,8 +538,8 @@ fn waitpid(pid: pid_t) -> int {
 
     #[cfg(windows)]
     fn waitpid_os(pid: pid_t) -> int {
-        use libc::types::os::arch::extra::DWORD;
-        use libc::consts::os::extra::{
+        use std::libc::types::os::arch::extra::DWORD;
+        use std::libc::consts::os::extra::{
             SYNCHRONIZE,
             PROCESS_QUERY_INFORMATION,
             FALSE,
@@ -549,7 +547,7 @@ fn waitpid(pid: pid_t) -> int {
             INFINITE,
             WAIT_FAILED
         };
-        use libc::funcs::extra::kernel32::{
+        use std::libc::funcs::extra::kernel32::{
             OpenProcess,
             GetExitCodeProcess,
             CloseHandle,
@@ -585,7 +583,7 @@ fn waitpid(pid: pid_t) -> int {
 
     #[cfg(unix)]
     fn waitpid_os(pid: pid_t) -> int {
-        use libc::funcs::posix01::wait::*;
+        use std::libc::funcs::posix01::wait;
 
         #[cfg(target_os = "linux")]
         #[cfg(target_os = "android")]
@@ -612,7 +610,7 @@ fn waitpid(pid: pid_t) -> int {
         }
 
         let mut status = 0 as c_int;
-        if unsafe { waitpid(pid, &mut status, 0) } == -1 {
+        if unsafe { wait::waitpid(pid, &mut status, 0) } == -1 {
             fail!("failure in waitpid: {}", os::last_os_error());
         }
 
diff --git a/src/libnative/lib.rs b/src/libnative/lib.rs
new file mode 100644
index 00000000000..4b32511dc42
--- /dev/null
+++ b/src/libnative/lib.rs
@@ -0,0 +1,61 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! The native runtime crate
+//!
+//! This crate contains an implementation of 1:1 scheduling for a "native"
+//! runtime. In addition, all I/O provided by this crate is the thread blocking
+//! version of I/O.
+
+#[link(name = "native",
+       package_id = "native",
+       vers = "0.9-pre",
+       uuid = "535344a7-890f-5a23-e1f3-e0d118805141",
+       url = "https://github.com/mozilla/rust/tree/master/src/native")];
+
+#[license = "MIT/ASL2"];
+#[crate_type = "rlib"];
+#[crate_type = "dylib"];
+
+// NB this crate explicitly does *not* allow glob imports, please seriously
+//    consider whether they're needed before adding that feature here.
+
+use std::cast;
+use std::os;
+use std::rt;
+use std::task::try;
+
+pub mod io;
+pub mod task;
+
+// XXX: this should not exist here
+#[cfg(stage0)]
+#[lang = "start"]
+pub fn start(main: *u8, argc: int, argv: **u8) -> int {
+    rt::init(argc, argv);
+
+    // Bootstrap ourselves by installing a local Task and then immediately
+    // spawning a thread to run 'main'. Always spawn a new thread for main so
+    // the stack size of 'main' is known (and the bounds can be set
+    // appropriately).
+    //
+    // Once the main task has completed, then we wait for everyone else to exit.
+    task::run(task::new(), proc() {
+        let main: extern "Rust" fn() = unsafe { cast::transmute(main) };
+        match do try { main() } {
+            Ok(()) => { os::set_exit_status(0); }
+            Err(..) => { os::set_exit_status(rt::DEFAULT_ERROR_CODE); }
+        }
+    });
+    task::wait_for_completion();
+
+    unsafe { rt::cleanup(); }
+    os::get_exit_status()
+}
diff --git a/src/libnative/task.rs b/src/libnative/task.rs
new file mode 100644
index 00000000000..fa7500ca85e
--- /dev/null
+++ b/src/libnative/task.rs
@@ -0,0 +1,257 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Tasks implemented on top of OS threads
+//!
+//! This module contains the implementation of the 1:1 threading module required
+//! by rust tasks. This implements the necessary API traits laid out by std::rt
+//! in order to spawn new tasks and deschedule the current task.
+
+use std::cast;
+use std::rt::env;
+use std::rt::local::Local;
+use std::rt::rtio;
+use std::rt::task::{Task, BlockedTask};
+use std::rt::thread::Thread;
+use std::rt;
+use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
+use std::task::TaskOpts;
+use std::unstable::mutex::{Mutex, MUTEX_INIT};
+use std::unstable::stack;
+
+use io;
+use task;
+
+static mut THREAD_CNT: AtomicUint = INIT_ATOMIC_UINT;
+static mut LOCK: Mutex = MUTEX_INIT;
+
+/// Waits for all spawned threads to finish completion. This should only be used
+/// by the main task in order to wait for all other tasks to terminate.
+///
+/// This mirrors the same semantics as the green scheduling model.
+pub fn wait_for_completion() {
+    static mut someone_waited: bool = false;
+
+    unsafe {
+        LOCK.lock();
+        assert!(!someone_waited);
+        someone_waited = true;
+        while THREAD_CNT.load(SeqCst) > 0 {
+            LOCK.wait();
+        }
+        LOCK.unlock();
+        LOCK.destroy();
+    }
+
+}
+
+// Signal that a thread has finished execution, possibly waking up a blocker
+// waiting for all threads to have finished.
+fn signal_done() {
+    unsafe {
+        LOCK.lock();
+        if THREAD_CNT.fetch_sub(1, SeqCst) == 1 {
+            LOCK.signal();
+        }
+        LOCK.unlock();
+    }
+}
+
+/// Creates a new Task which is ready to execute as a 1:1 task.
+pub fn new() -> ~Task {
+    let mut task = ~Task::new();
+    task.put_runtime(~Ops {
+        lock: unsafe { Mutex::new() },
+    } as ~rt::Runtime);
+    return task;
+}
+
+/// Spawns a new task given the configuration options and a procedure to run
+/// inside the task.
+pub fn spawn(opts: TaskOpts, f: proc()) {
+    // must happen before the spawn, no need to synchronize with a lock.
+    unsafe { THREAD_CNT.fetch_add(1, SeqCst); }
+
+    let TaskOpts {
+        watched: _watched,
+        notify_chan, name, stack_size
+    } = opts;
+
+    let mut task = new();
+    task.name = name;
+    match notify_chan {
+        Some(chan) => {
+            let on_exit = proc(task_result) { chan.send(task_result) };
+            task.death.on_exit = Some(on_exit);
+        }
+        None => {}
+    }
+
+    let stack = stack_size.unwrap_or(env::min_stack());
+    let task = task;
+
+    // Spawning a new OS thread guarantees that __morestack will never get
+    // triggered, but we must manually set up the actual stack bounds once this
+    // function starts executing. This raises the lower limit by a bit because
+    // by the time that this function is executing we've already consumed at
+    // least a little bit of stack (we don't know the exact byte address at
+    // which our stack started).
+    Thread::spawn_stack(stack, proc() {
+        let something_around_the_top_of_the_stack = 1;
+        let addr = &something_around_the_top_of_the_stack as *int;
+        unsafe {
+            let my_stack = addr as uint;
+            stack::record_stack_bounds(my_stack - stack + 1024, my_stack);
+        }
+
+        run(task, f);
+        signal_done();
+    })
+}
+
+/// Runs a task once, consuming the task. The given procedure is run inside of
+/// the task.
+pub fn run(t: ~Task, f: proc()) {
+    let mut f = Some(f);
+    t.run(|| { f.take_unwrap()(); });
+}
+
+// This structure is the glue between channels and the 1:1 scheduling mode. This
+// structure is allocated once per task.
+struct Ops {
+    lock: Mutex, // native synchronization
+}
+
+impl rt::Runtime for Ops {
+    fn yield_now(~self, mut cur_task: ~Task) {
+        // put the task back in TLS and then invoke the OS thread yield
+        cur_task.put_runtime(self as ~rt::Runtime);
+        Local::put(cur_task);
+        Thread::yield_now();
+    }
+
+    fn maybe_yield(~self, mut cur_task: ~Task) {
+        // just put the task back in TLS, on OS threads we never need to
+        // opportunistically yield b/c the OS will do that for us (preemption)
+        cur_task.put_runtime(self as ~rt::Runtime);
+        Local::put(cur_task);
+    }
+
+    fn wrap(~self) -> ~Any {
+        self as ~Any
+    }
+
+    // This function gets a little interesting. There are a few safety and
+    // ownership violations going on here, but this is all done in the name of
+    // shared state. Additionally, all of the violations are protected with a
+    // mutex, so in theory there are no races.
+    //
+    // The first thing we need to do is to get a pointer to the task's internal
+    // mutex. This address will not be changing (because the task is allocated
+    // on the heap). We must have this handle separately because the task will
+    // have its ownership transferred to the given closure. We're guaranteed,
+    // however, that this memory will remain valid because *this* is the current
+    // task's execution thread.
+    //
+    // The next weird part is where ownership of the task actually goes. We
+    // relinquish it to the `f` blocking function, but upon returning this
+    // function needs to replace the task back in TLS. There is no communication
+    // from the wakeup thread back to this thread about the task pointer, and
+    // there's really no need to. In order to get around this, we cast the task
+    // to a `uint` which is then used at the end of this function to cast back
+    // to a `~Task` object. Naturally, this looks like it violates ownership
+    // semantics in that there may be two `~Task` objects.
+    //
+    // The fun part is that the wakeup half of this implementation knows to
+    // "forget" the task on the other end. This means that the awakening half of
+    // things silently relinquishes ownership back to this thread, but not in a
+    // way that the compiler can understand. The task's memory is always valid
+    // for both tasks because these operations are all done inside of a mutex.
+    //
+    // You'll also find that if blocking fails (the `f` function hands the
+    // BlockedTask back to us), we will `cast::forget` the handles. The
+    // reasoning for this is the same logic as above in that the task silently
+    // transfers ownership via the `uint`, not through normal compiler
+    // semantics.
+    fn deschedule(mut ~self, times: uint, mut cur_task: ~Task,
+                  f: |BlockedTask| -> Result<(), BlockedTask>) {
+        let my_lock: *mut Mutex = &mut self.lock as *mut Mutex;
+        cur_task.put_runtime(self as ~rt::Runtime);
+
+        unsafe {
+            let cur_task_dupe = *cast::transmute::<&~Task, &uint>(&cur_task);
+            let task = BlockedTask::block(cur_task);
+
+            if times == 1 {
+                (*my_lock).lock();
+                match f(task) {
+                    Ok(()) => (*my_lock).wait(),
+                    Err(task) => { cast::forget(task.wake()); }
+                }
+                (*my_lock).unlock();
+            } else {
+                let mut iter = task.make_selectable(times);
+                (*my_lock).lock();
+                let success = iter.all(|task| {
+                    match f(task) {
+                        Ok(()) => true,
+                        Err(task) => {
+                            cast::forget(task.wake());
+                            false
+                        }
+                    }
+                });
+                if success {
+                    (*my_lock).wait();
+                }
+                (*my_lock).unlock();
+            }
+            // re-acquire ownership of the task
+            cur_task = cast::transmute::<uint, ~Task>(cur_task_dupe);
+        }
+
+        // put the task back in TLS, and everything is as it once was.
+        Local::put(cur_task);
+    }
+
+    // See the comments on `deschedule` for why the task is forgotten here, and
+    // why it's valid to do so.
+    fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
+        unsafe {
+            let lock: *mut Mutex = &mut self.lock as *mut Mutex;
+            to_wake.put_runtime(self as ~rt::Runtime);
+            cast::forget(to_wake);
+            (*lock).lock();
+            (*lock).signal();
+            (*lock).unlock();
+        }
+    }
+
+    fn spawn_sibling(~self, mut cur_task: ~Task, opts: TaskOpts, f: proc()) {
+        cur_task.put_runtime(self as ~rt::Runtime);
+        Local::put(cur_task);
+
+        task::spawn(opts, f);
+    }
+
+    fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> {
+        static mut io: io::IoFactory = io::IoFactory;
+        // Unsafety is from accessing `io`, which is guaranteed to be safe
+        // because you can't do anything usable with this statically initialized
+        // unit struct.
+        Some(unsafe { rtio::LocalIo::new(&mut io as &mut rtio::IoFactory) })
+    }
+}
+
+impl Drop for Ops {
+    fn drop(&mut self) {
+        unsafe { self.lock.destroy() }
+    }
+}
diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs
index bd0b9e08b7c..0852c4cadb6 100644
--- a/src/libstd/io/mod.rs
+++ b/src/libstd/io/mod.rs
@@ -164,9 +164,6 @@ requests are implemented by descheduling the running task and
 performing an asynchronous request; the task is only resumed once the
 asynchronous request completes.
 
-For blocking (but possibly more efficient) implementations, look
-in the `io::native` module.
-
 # Error Handling
 
 I/O is an area where nearly every operation can result in unexpected
@@ -349,8 +346,6 @@ pub mod timer;
 /// Buffered I/O wrappers
 pub mod buffered;
 
-pub mod native;
-
 /// Signal handling
 pub mod signal;