diff options
| author | Niko Matsakis <niko@alum.mit.edu> | 2014-11-26 10:10:52 -0500 |
|---|---|---|
| committer | Niko Matsakis <niko@alum.mit.edu> | 2014-12-14 04:21:56 -0500 |
| commit | d61338172fa110fcf9e5f2df0e1e83635d0fde3f (patch) | |
| tree | 5a022b681067ce30acf73e06aef9896f3cfd4be2 /src | |
| parent | 10ac5b72f1974775bed499105c2a3cf18da98f32 (diff) | |
| download | rust-d61338172fa110fcf9e5f2df0e1e83635d0fde3f.tar.gz rust-d61338172fa110fcf9e5f2df0e1e83635d0fde3f.zip | |
Rewrite threading infrastructure, introducing `Thunk` to represent
boxed `FnOnce` closures.
Diffstat (limited to 'src')
| -rw-r--r-- | src/librustrt/at_exit_imp.rs | 7 | ||||
| -rw-r--r-- | src/librustrt/lib.rs | 5 | ||||
| -rw-r--r-- | src/librustrt/task.rs | 25 | ||||
| -rw-r--r-- | src/librustrt/thread.rs | 58 | ||||
| -rw-r--r-- | src/librustrt/thunk.rs | 52 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 9 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 31 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 16 | ||||
| -rw-r--r-- | src/libstd/sys/unix/process.rs | 16 | ||||
| -rw-r--r-- | src/libstd/task.rs | 128 |
11 files changed, 225 insertions, 124 deletions
diff --git a/src/librustrt/at_exit_imp.rs b/src/librustrt/at_exit_imp.rs index ce27decb136..8be77d9b34d 100644 --- a/src/librustrt/at_exit_imp.rs +++ b/src/librustrt/at_exit_imp.rs @@ -18,10 +18,11 @@ use alloc::boxed::Box; use collections::vec::Vec; use core::atomic; use core::mem; +use thunk::{Thunk}; use exclusive::Exclusive; -type Queue = Exclusive<Vec<proc():Send>>; +type Queue = Exclusive<Vec<Thunk>>; static QUEUE: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static RUNNING: atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL; @@ -34,7 +35,7 @@ pub fn init() { } } -pub fn push(f: proc():Send) { +pub fn push(f: Thunk) { unsafe { // Note that the check against 0 for the queue pointer is not atomic at // all with respect to `run`, meaning that this could theoretically be a @@ -59,6 +60,6 @@ pub fn run() { }; for to_run in cur.into_iter() { - to_run(); + to_run.invoke(()); } } diff --git a/src/librustrt/lib.rs b/src/librustrt/lib.rs index c2ee91d6acc..f12f8e49801 100644 --- a/src/librustrt/lib.rs +++ b/src/librustrt/lib.rs @@ -46,6 +46,7 @@ mod thread_local_storage; mod util; mod libunwind; mod stack_overflow; +pub mod thunk; pub mod args; pub mod bookkeeping; @@ -95,8 +96,8 @@ pub fn init(argc: int, argv: *const *const u8) { /// /// It is forbidden for procedures to register more `at_exit` handlers when they /// are running, and doing so will lead to a process abort. -pub fn at_exit(f: proc():Send) { - at_exit_imp::push(f); +pub fn at_exit<F:FnOnce()+Send>(f: F) { + at_exit_imp::push(thunk::Thunk::new(f)); } /// One-time runtime cleanup. diff --git a/src/librustrt/task.rs b/src/librustrt/task.rs index 7e657d3aef3..37632f509c1 100644 --- a/src/librustrt/task.rs +++ b/src/librustrt/task.rs @@ -21,6 +21,7 @@ use core::any::Any; use core::atomic::{AtomicUint, SeqCst}; use core::iter::{IteratorExt, Take}; use core::kinds::marker; +use core::ops::FnOnce; use core::mem; use core::ops::FnMut; use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some}; @@ -34,6 +35,7 @@ use stack; use unwind; use unwind::Unwinder; use collections::str::SendStr; +use thunk::Thunk; /// State associated with Rust tasks. /// @@ -67,7 +69,7 @@ enum TaskState { pub struct TaskOpts { /// Invoke this procedure with the result of the task when it finishes. - pub on_exit: Option<proc(Result): Send>, + pub on_exit: Option<Thunk<Result>>, /// A name for the task-to-be, for identification in panic messages pub name: Option<SendStr>, /// The size of the stack for the spawned task @@ -92,7 +94,7 @@ pub enum BlockedTask { /// Per-task state related to task death, killing, panic, etc. pub struct Death { - pub on_exit: Option<proc(Result):Send>, + pub on_exit: Option<Thunk<Result>>, marker: marker::NoCopy, } @@ -116,7 +118,13 @@ impl Task { } } - pub fn spawn(opts: TaskOpts, f: proc():Send) { + pub fn spawn<F>(opts: TaskOpts, f: F) + where F : FnOnce(), F : Send + { + Task::spawn_thunk(opts, Thunk::new(f)) + } + + fn spawn_thunk(opts: TaskOpts, f: Thunk) { let TaskOpts { name, stack_size, on_exit } = opts; let mut task = box Task::new(None, None); @@ -138,7 +146,7 @@ impl Task { // because by the time that this function is executing we've already // consumed at least a little bit of stack (we don't know the exact byte // address at which our stack started). - Thread::spawn_stack(stack, proc() { + Thread::spawn_stack(stack, move|| { let something_around_the_top_of_the_stack = 1; let addr = &something_around_the_top_of_the_stack as *const int; let my_stack = addr as uint; @@ -150,7 +158,7 @@ impl Task { task.stack_bounds = (my_stack - stack + 1024, my_stack); let mut f = Some(f); - drop(task.run(|| { f.take().unwrap()() }).destroy()); + drop(task.run(|| { f.take().unwrap().invoke(()) }).destroy()); drop(token); }) } @@ -241,7 +249,7 @@ impl Task { // reconsideration to whether it's a reasonable thing to let a // task to do or not. match what_to_do { - Some(f) => { f(result) } + Some(f) => { f.invoke(result) } None => { drop(result) } } @@ -500,14 +508,13 @@ mod test { use super::*; use std::prelude::*; use std::task; - use unwind; #[test] fn unwind() { - let result = task::try(proc()()); + let result = task::try(move|| ()); rtdebug!("trying first assert"); assert!(result.is_ok()); - let result = task::try::<()>(proc() panic!()); + let result = task::try(move|| -> () panic!()); rtdebug!("trying second assert"); assert!(result.is_err()); } diff --git a/src/librustrt/thread.rs b/src/librustrt/thread.rs index 9f3f45ba098..175e057c22f 100644 --- a/src/librustrt/thread.rs +++ b/src/librustrt/thread.rs @@ -22,6 +22,7 @@ use alloc::boxed::Box; use core::mem; use core::uint; use libc; +use thunk::{Thunk}; use stack; use stack_overflow; @@ -60,8 +61,8 @@ fn start_thread(main: *mut libc::c_void) -> imp::rust_thread_return { unsafe { stack::record_os_managed_stack_bounds(0, uint::MAX); let handler = stack_overflow::Handler::new(); - let f: Box<proc()> = mem::transmute(main); - (*f)(); + let f: Box<Thunk> = mem::transmute(main); + f.invoke(()); drop(handler); mem::transmute(0 as imp::rust_thread_return) } @@ -113,14 +114,17 @@ impl Thread<()> { /// to finish executing. This means that even if `join` is not explicitly /// called, when the `Thread` falls out of scope its destructor will block /// waiting for the OS thread. - pub fn start<T: Send>(main: proc():Send -> T) -> Thread<T> { + pub fn start<T,F>(main: F) -> Thread<T> + where T:Send, F:FnOnce() -> T, F:Send + { Thread::start_stack(DEFAULT_STACK_SIZE, main) } /// Performs the same functionality as `start`, but specifies an explicit /// stack size for the new thread. - pub fn start_stack<T: Send>(stack: uint, main: proc():Send -> T) -> Thread<T> { - + pub fn start_stack<T, F>(stack: uint, main: F) -> Thread<T> + where T:Send, F:FnOnce() -> T, F:Send + { // We need the address of the packet to fill in to be stable so when // `main` fills it in it's still valid, so allocate an extra box to do // so. @@ -128,8 +132,11 @@ impl Thread<()> { let packet2: *mut Option<T> = unsafe { *mem::transmute::<&Box<Option<T>>, *const *mut Option<T>>(&packet) }; - let main = proc() unsafe { *packet2 = Some(main()); }; - let native = unsafe { imp::create(stack, box main) }; + let native = unsafe { + imp::create(stack, Thunk::new(move |:| { + *packet2 = Some(main.call_once(())); + })) + }; Thread { native: native, @@ -144,15 +151,19 @@ impl Thread<()> { /// This corresponds to creating threads in the 'detached' state on unix /// systems. Note that platforms may not keep the main program alive even if /// there are detached thread still running around. - pub fn spawn(main: proc():Send) { + pub fn spawn<F>(main: F) + where F : FnOnce() + Send + { Thread::spawn_stack(DEFAULT_STACK_SIZE, main) } /// Performs the same functionality as `spawn`, but explicitly specifies a /// stack size for the new thread. - pub fn spawn_stack(stack: uint, main: proc():Send) { + pub fn spawn_stack<F>(stack: uint, main: F) + where F : FnOnce() + Send + { unsafe { - let handle = imp::create(stack, box main); + let handle = imp::create(stack, Thunk::new(main)); imp::detach(handle); } } @@ -190,8 +201,6 @@ impl<T: Send> Drop for Thread<T> { #[cfg(windows)] #[allow(non_snake_case)] mod imp { - use core::prelude::*; - use alloc::boxed::Box; use core::cmp; use core::mem; @@ -200,6 +209,7 @@ mod imp { use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, LPVOID, DWORD, LPDWORD, HANDLE}; use stack::RED_ZONE; + use thunk::Thunk; pub type rust_thread = HANDLE; pub type rust_thread_return = DWORD; @@ -217,8 +227,9 @@ mod imp { } } - pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread { - let arg: *mut libc::c_void = mem::transmute(p); + pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { + let arg: *mut libc::c_void = mem::transmute(box p); + // FIXME On UNIX, we guard against stack sizes that are too small but // that's because pthreads enforces that stacks are at least // PTHREAD_STACK_MIN bytes big. Windows has no such lower limit, it's @@ -234,7 +245,7 @@ mod imp { if ret as uint == 0 { // be sure to not leak the closure - let _p: Box<proc():Send> = mem::transmute(arg); + let _p: Box<Thunk> = mem::transmute(arg); panic!("failed to spawn native thread: {}", ret); } return ret; @@ -279,6 +290,7 @@ mod imp { use core::ptr; use libc::consts::os::posix01::{PTHREAD_CREATE_JOINABLE, PTHREAD_STACK_MIN}; use libc; + use thunk::Thunk; use stack::RED_ZONE; @@ -409,7 +421,7 @@ mod imp { } } - pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread { + pub unsafe fn create(stack: uint, p: Thunk) -> rust_thread { let mut native: libc::pthread_t = mem::zeroed(); let mut attr: libc::pthread_attr_t = mem::zeroed(); assert_eq!(pthread_attr_init(&mut attr), 0); @@ -437,13 +449,13 @@ mod imp { }, }; - let arg: *mut libc::c_void = mem::transmute(p); + let arg: *mut libc::c_void = mem::transmute(box p); // must box since sizeof(p)=2*uint let ret = pthread_create(&mut native, &attr, super::thread_start, arg); assert_eq!(pthread_attr_destroy(&mut attr), 0); if ret != 0 { // be sure to not leak the closure - let _p: Box<proc():Send> = mem::transmute(arg); + let _p: Box<Box<FnOnce()+Send>> = mem::transmute(arg); panic!("failed to spawn native thread: {}", ret); } native @@ -531,17 +543,17 @@ mod tests { use super::Thread; #[test] - fn smoke() { Thread::start(proc (){}).join(); } + fn smoke() { Thread::start(move|| {}).join(); } #[test] - fn data() { assert_eq!(Thread::start(proc () { 1i }).join(), 1); } + fn data() { assert_eq!(Thread::start(move|| { 1i }).join(), 1); } #[test] - fn detached() { Thread::spawn(proc () {}) } + fn detached() { Thread::spawn(move|| {}) } #[test] fn small_stacks() { - assert_eq!(42i, Thread::start_stack(0, proc () 42i).join()); - assert_eq!(42i, Thread::start_stack(1, proc () 42i).join()); + assert_eq!(42i, Thread::start_stack(0, move|| 42i).join()); + assert_eq!(42i, Thread::start_stack(1, move|| 42i).join()); } } diff --git a/src/librustrt/thunk.rs b/src/librustrt/thunk.rs new file mode 100644 index 00000000000..42e78495990 --- /dev/null +++ b/src/librustrt/thunk.rs @@ -0,0 +1,52 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use alloc::boxed::Box; +use core::kinds::Send; +use core::ops::FnOnce; + +pub struct Thunk<A=(),R=()> { + invoke: Box<Invoke<A,R>+Send> +} + +impl<R> Thunk<(),R> { + pub fn new<F>(func: F) -> Thunk<(),R> + where F : FnOnce() -> R, F : Send + { + Thunk::with_arg(move|: ()| func()) + } +} + +impl<A,R> Thunk<A,R> { + pub fn with_arg<F>(func: F) -> Thunk<A,R> + where F : FnOnce(A) -> R, F : Send + { + Thunk { + invoke: box func + } + } + + pub fn invoke(self, arg: A) -> R { + self.invoke.invoke(arg) + } +} + +pub trait Invoke<A=(),R=()> { + fn invoke(self: Box<Self>, arg: A) -> R; +} + +impl<A,R,F> Invoke<A,R> for F + where F : FnOnce(A) -> R +{ + fn invoke(self: Box<F>, arg: A) -> R { + let f = *self; + f(arg) + } +} diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index c2363c9946a..e99aba9b673 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -171,6 +171,8 @@ pub use rustrt::c_str; pub use unicode::char; +pub use rustrt::thunk; + /* Exported macros */ pub mod macros; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 5ecd3ff04f1..eb517047ddc 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -52,6 +52,7 @@ use borrow::IntoCow; use failure; use rustrt; use os; +use thunk::Thunk; // Reexport some of our utilities which are expected by other crates. pub use self::util::{default_sched_threads, min_stack, running_on_valgrind}; @@ -87,10 +88,10 @@ static OS_DEFAULT_STACK_ESTIMATE: uint = 2 * (1 << 20); #[lang = "start"] fn lang_start(main: *const u8, argc: int, argv: *const *const u8) -> int { use mem; - start(argc, argv, proc() { + start(argc, argv, Thunk::new(move|| { let main: extern "Rust" fn() = unsafe { mem::transmute(main) }; main(); - }) + })) } /// Executes the given procedure after initializing the runtime with the given @@ -102,7 +103,7 @@ fn lang_start(main: *const u8, argc: int, argv: *const *const u8) -> int { /// /// This function will only return once *all* native threads in the system have /// exited. -pub fn start(argc: int, argv: *const *const u8, main: proc()) -> int { +pub fn start(argc: int, argv: *const *const u8, main: Thunk) -> int { use prelude::*; use rt; use rustrt::task::Task; @@ -144,7 +145,7 @@ pub fn start(argc: int, argv: *const *const u8, main: proc()) -> int { unsafe { rustrt::stack::record_os_managed_stack_bounds(my_stack_bottom, my_stack_top); } - (main.take().unwrap())(); + (main.take().unwrap()).invoke(()); exit_code = Some(os::get_exit_status()); }).destroy()); unsafe { rt::cleanup(); } diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs index a8c9983e5aa..e5a1e09967c 100644 --- a/src/libstd/sync/future.rs +++ b/src/libstd/sync/future.rs @@ -17,7 +17,7 @@ //! use std::sync::Future; //! # fn fib(n: uint) -> uint {42}; //! # fn make_a_sandwich() {}; -//! let mut delayed_fib = Future::spawn(proc() { fib(5000) }); +//! let mut delayed_fib = Future::spawn(move|| { fib(5000) }); //! make_a_sandwich(); //! println!("fib(5000) = {}", delayed_fib.get()) //! ``` @@ -30,6 +30,7 @@ use core::mem::replace; use self::FutureState::*; use comm::{Receiver, channel}; use task::spawn; +use thunk::{Thunk}; /// A type encapsulating the result of a computation which may not be complete pub struct Future<A> { @@ -37,7 +38,7 @@ pub struct Future<A> { } enum FutureState<A> { - Pending(proc():Send -> A), + Pending(Thunk<(),A>), Evaluating, Forced(A) } @@ -78,7 +79,7 @@ impl<A> Future<A> { match replace(&mut self.state, Evaluating) { Forced(_) | Evaluating => panic!("Logic error."), Pending(f) => { - self.state = Forced(f()); + self.state = Forced(f.invoke(())); self.get_ref() } } @@ -97,7 +98,9 @@ impl<A> Future<A> { Future {state: Forced(val)} } - pub fn from_fn(f: proc():Send -> A) -> Future<A> { + pub fn from_fn<F>(f: F) -> Future<A> + where F : FnOnce() -> A, F : Send + { /*! * Create a future from a function. * @@ -106,7 +109,7 @@ impl<A> Future<A> { * function. It is not spawned into another task. */ - Future {state: Pending(f)} + Future {state: Pending(Thunk::new(f))} } } @@ -119,12 +122,14 @@ impl<A:Send> Future<A> { * waiting for the result to be received on the port. */ - Future::from_fn(proc() { + Future::from_fn(move|:| { rx.recv() }) } - pub fn spawn(blk: proc():Send -> A) -> Future<A> { + pub fn spawn<F>(blk: F) -> Future<A> + where F : FnOnce() -> A, F : Send + { /*! * Create a future from a unique closure. * @@ -134,7 +139,7 @@ impl<A:Send> Future<A> { let (tx, rx) = channel(); - spawn(proc() { + spawn(move |:| { // Don't panic if the other end has hung up let _ = tx.send_opt(blk()); }); @@ -166,7 +171,7 @@ mod test { #[test] fn test_from_fn() { - let mut f = Future::from_fn(proc() "brail".to_string()); + let mut f = Future::from_fn(move|| "brail".to_string()); assert_eq!(f.get(), "brail"); } @@ -190,14 +195,14 @@ mod test { #[test] fn test_spawn() { - let mut f = Future::spawn(proc() "bale".to_string()); + let mut f = Future::spawn(move|| "bale".to_string()); assert_eq!(f.get(), "bale"); } #[test] #[should_fail] fn test_future_panic() { - let mut f = Future::spawn(proc() panic!()); + let mut f = Future::spawn(move|| panic!()); let _x: String = f.get(); } @@ -205,8 +210,8 @@ mod test { fn test_sendable_future() { let expected = "schlorf"; let (tx, rx) = channel(); - let f = Future::spawn(proc() { expected }); - task::spawn(proc() { + let f = Future::spawn(move|| { expected }); + task::spawn(move|| { let mut f = f; tx.send(f.get()); }); diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs index 4ae5cd054f6..a684c6502ae 100644 --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@ -72,7 +72,7 @@ pub struct TaskPool { // // This is the only such Sender, so when it is dropped all subtasks will // quit. - jobs: Sender<proc(): Send> + jobs: Sender<Thunk> } impl TaskPool { @@ -84,7 +84,7 @@ impl TaskPool { pub fn new(tasks: uint) -> TaskPool { assert!(tasks >= 1); - let (tx, rx) = channel::<proc(): Send>(); + let (tx, rx) = channel::<Thunk>(); let rx = Arc::new(Mutex::new(rx)); // Taskpool tasks. @@ -96,13 +96,15 @@ impl TaskPool { } /// Executes the function `job` on a task in the pool. - pub fn execute(&self, job: proc():Send) { - self.jobs.send(job); + pub fn execute<F>(&self, job: F) + where F : FnOnce(), F : Send + { + self.jobs.send(Thunk::new(job)); } } -fn spawn_in_pool(jobs: Arc<Mutex<Receiver<proc(): Send>>>) { - spawn(proc() { +fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) { + spawn(move |:| { // Will spawn a new task on panic unless it is cancelled. let sentinel = Sentinel::new(&jobs); @@ -115,7 +117,7 @@ fn spawn_in_pool(jobs: Arc<Mutex<Receiver<proc(): Send>>>) { }; match message { - Ok(job) => job(), + Ok(job) => job.invoke(()), // The Taskpool was dropped. Err(..) => break diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index dfbba0f335c..f71b34304ab 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -531,8 +531,11 @@ impl Process { } } -fn with_argv<T>(prog: &CString, args: &[CString], - cb: proc(*const *const libc::c_char) -> T) -> T { +fn with_argv<T,F>(prog: &CString, args: &[CString], + cb: F) + -> T + where F : FnOnce(*const *const libc::c_char) -> T +{ let mut ptrs: Vec<*const libc::c_char> = Vec::with_capacity(args.len()+1); // Convert the CStrings into an array of pointers. Note: the @@ -549,9 +552,12 @@ fn with_argv<T>(prog: &CString, args: &[CString], cb(ptrs.as_ptr()) } -fn with_envp<K, V, T>(env: Option<&collections::HashMap<K, V>>, - cb: proc(*const c_void) -> T) -> T - where K: BytesContainer + Eq + Hash, V: BytesContainer +fn with_envp<K,V,T,F>(env: Option<&collections::HashMap<K, V>>, + cb: F) + -> T + where F : FnOnce(*const c_void) -> T, + K : BytesContainer + Eq + Hash, + V : BytesContainer { // On posixy systems we can pass a char** for envp, which is a // null-terminated array of "k=v\0" strings. Since we must create diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 5a1a5b4fb7a..340e283708a 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -35,7 +35,7 @@ //! ## Example //! //! ```rust -//! spawn(proc() { +//! spawn(move|| { //! println!("Hello, World!"); //! }) //! ``` @@ -47,6 +47,7 @@ use any::Any; use borrow::IntoCow; use boxed::Box; use comm::channel; +use core::ops::FnOnce; use io::{Writer, stdio}; use kinds::{Send, marker}; use option::Option; @@ -57,6 +58,7 @@ use rustrt::task::Task; use rustrt::task; use str::SendStr; use string::{String, ToString}; +use thunk::{Thunk}; use sync::Future; /// The task builder type. @@ -80,7 +82,7 @@ pub struct TaskBuilder { // Task-local stderr stderr: Option<Box<Writer + Send>>, // Optionally wrap the eventual task body - gen_body: Option<proc(v: proc():Send):Send -> proc():Send>, + gen_body: Option<Thunk<Thunk, Thunk>>, nocopy: marker::NoCopy, } @@ -129,41 +131,46 @@ impl TaskBuilder { } // Where spawning actually happens (whether yielding a future or not) - fn spawn_internal(self, f: proc():Send, - on_exit: Option<proc(Result<(), Box<Any + Send>>):Send>) { + fn spawn_internal( + self, + f: Thunk, + on_exit: Option<Thunk<task::Result>>) + { let TaskBuilder { name, stack_size, stdout, stderr, mut gen_body, nocopy: _ } = self; + let f = match gen_body.take() { - Some(gen) => gen(f), + Some(gen) => gen.invoke(f), None => f }; + let opts = task::TaskOpts { on_exit: on_exit, name: name, stack_size: stack_size, }; if stdout.is_some() || stderr.is_some() { - Task::spawn(opts, proc() { + Task::spawn(opts, move|:| { let _ = stdout.map(stdio::set_stdout); let _ = stderr.map(stdio::set_stderr); - f(); - }) + f.invoke(()); + }); } else { - Task::spawn(opts, f) + Task::spawn(opts, move|:| f.invoke(())) } } /// Creates and executes a new child task. /// /// Sets up a new task with its own call stack and schedules it to run - /// the provided proc. The task has the properties and behavior + /// the provided function. The task has the properties and behavior /// specified by the `TaskBuilder`. - pub fn spawn(self, f: proc():Send) { - self.spawn_internal(f, None) + pub fn spawn<F:FnOnce()+Send>(self, f: F) { + self.spawn_internal(Thunk::new(f), None) } - /// Execute a proc in a newly-spawned task and return a future representing + /// Execute a function in a newly-spawned task and return a future representing /// the task's result. The task has the properties and behavior /// specified by the `TaskBuilder`. /// @@ -178,20 +185,22 @@ impl TaskBuilder { /// `result::Result::Err` containing the argument to `panic!(...)` as an /// `Any` trait object. #[experimental = "Futures are experimental."] - pub fn try_future<T:Send>(self, f: proc():Send -> T) - -> Future<Result<T, Box<Any + Send>>> { - // currently, the on_exit proc provided by librustrt only works for unit + pub fn try_future<T:Send,F:FnOnce()->(T)+Send>(self, f: F) + -> Future<Result<T, Box<Any + Send>>> { + // currently, the on_exit fn provided by librustrt only works for unit // results, so we use an additional side-channel to communicate the // result. let (tx_done, rx_done) = channel(); // signal that task has exited let (tx_retv, rx_retv) = channel(); // return value from task - let on_exit = proc(res) { let _ = tx_done.send_opt(res); }; - self.spawn_internal(proc() { let _ = tx_retv.send_opt(f()); }, + let on_exit: Thunk<task::Result> = Thunk::with_arg(move |: res: task::Result| { + let _ = tx_done.send_opt(res); + }); + self.spawn_internal(Thunk::new(move |:| { let _ = tx_retv.send_opt(f()); }), Some(on_exit)); - Future::from_fn(proc() { + Future::from_fn(move|:| { rx_done.recv().map(|_| rx_retv.recv()) }) } @@ -199,7 +208,9 @@ impl TaskBuilder { /// Execute a function in a newly-spawnedtask and block until the task /// completes or panics. Equivalent to `.try_future(f).unwrap()`. #[unstable = "Error type may change."] - pub fn try<T:Send>(self, f: proc():Send -> T) -> Result<T, Box<Any + Send>> { + pub fn try<T,F>(self, f: F) -> Result<T, Box<Any + Send>> + where F : FnOnce() -> T, F : Send, T : Send + { self.try_future(f).into_inner() } } @@ -212,7 +223,7 @@ impl TaskBuilder { /// the provided unique closure. /// /// This function is equivalent to `TaskBuilder::new().spawn(f)`. -pub fn spawn(f: proc(): Send) { +pub fn spawn<F:FnOnce()+Send>(f: F) { TaskBuilder::new().spawn(f) } @@ -221,7 +232,9 @@ pub fn spawn(f: proc(): Send) { /// /// This is equivalent to `TaskBuilder::new().try`. #[unstable = "Error type may change."] -pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> { +pub fn try<T,F>(f: F) -> Result<T, Box<Any + Send>> + where T : Send, F : FnOnce() -> T, F : Send +{ TaskBuilder::new().try(f) } @@ -230,11 +243,12 @@ pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> { /// /// This is equivalent to `TaskBuilder::new().try_future`. #[experimental = "Futures are experimental."] -pub fn try_future<T:Send>(f: proc():Send -> T) -> Future<Result<T, Box<Any + Send>>> { +pub fn try_future<T,F>(f: F) -> Future<Result<T, Box<Any + Send>>> + where T:Send, F:FnOnce()->T, F:Send +{ TaskBuilder::new().try_future(f) } - /* Lifecycle functions */ /// Read the name of the current task. @@ -274,6 +288,8 @@ mod test { use result; use std::io::{ChanReader, ChanWriter}; use string::String; + use thunk::Thunk; + use prelude::*; use super::*; // !!! These tests are dangerous. If something is buggy, they will hang, !!! @@ -281,28 +297,28 @@ mod test { #[test] fn test_unnamed_task() { - try(proc() { + try(move|| { assert!(name().is_none()); }).map_err(|_| ()).unwrap(); } #[test] fn test_owned_named_task() { - TaskBuilder::new().named("ada lovelace".to_string()).try(proc() { + TaskBuilder::new().named("ada lovelace".to_string()).try(move|| { assert!(name().unwrap() == "ada lovelace"); }).map_err(|_| ()).unwrap(); } #[test] fn test_static_named_task() { - TaskBuilder::new().named("ada lovelace").try(proc() { + TaskBuilder::new().named("ada lovelace").try(move|| { assert!(name().unwrap() == "ada lovelace"); }).map_err(|_| ()).unwrap(); } #[test] fn test_send_named_task() { - TaskBuilder::new().named("ada lovelace".into_cow()).try(proc() { + TaskBuilder::new().named("ada lovelace".into_cow()).try(move|| { assert!(name().unwrap() == "ada lovelace"); }).map_err(|_| ()).unwrap(); } @@ -310,7 +326,7 @@ mod test { #[test] fn test_run_basic() { let (tx, rx) = channel(); - TaskBuilder::new().spawn(proc() { + TaskBuilder::new().spawn(move|| { tx.send(()); }); rx.recv(); @@ -318,10 +334,10 @@ mod test { #[test] fn test_try_future() { - let result = TaskBuilder::new().try_future(proc() {}); + let result = TaskBuilder::new().try_future(move|| {}); assert!(result.unwrap().is_ok()); - let result = TaskBuilder::new().try_future(proc() -> () { + let result = TaskBuilder::new().try_future(move|| -> () { panic!(); }); assert!(result.unwrap().is_err()); @@ -329,7 +345,7 @@ mod test { #[test] fn test_try_success() { - match try(proc() { + match try(move|| { "Success!".to_string() }).as_ref().map(|s| s.as_slice()) { result::Result::Ok("Success!") => (), @@ -339,7 +355,7 @@ mod test { #[test] fn test_try_panic() { - match try(proc() { + match try(move|| { panic!() }) { result::Result::Err(_) => (), @@ -355,7 +371,7 @@ mod test { fn f(i: int, tx: Sender<()>) { let tx = tx.clone(); - spawn(proc() { + spawn(move|| { if i == 0 { tx.send(()); } else { @@ -372,8 +388,8 @@ mod test { fn test_spawn_sched_childs_on_default_sched() { let (tx, rx) = channel(); - spawn(proc() { - spawn(proc() { + spawn(move|| { + spawn(move|| { tx.send(()); }); }); @@ -382,17 +398,17 @@ mod test { } fn avoid_copying_the_body<F>(spawnfn: F) where - F: FnOnce(proc():Send), + F: FnOnce(Thunk), { let (tx, rx) = channel::<uint>(); let x = box 1; let x_in_parent = (&*x) as *const int as uint; - spawnfn(proc() { + spawnfn(Thunk::new(move|| { let x_in_child = (&*x) as *const int as uint; tx.send(x_in_child); - }); + })); let x_in_child = rx.recv(); assert_eq!(x_in_parent, x_in_child); @@ -400,25 +416,21 @@ mod test { #[test] fn test_avoid_copying_the_body_spawn() { - avoid_copying_the_body(spawn); + avoid_copying_the_body(|t| spawn(move|| t.invoke(()))); } #[test] fn test_avoid_copying_the_body_task_spawn() { avoid_copying_the_body(|f| { let builder = TaskBuilder::new(); - builder.spawn(proc() { - f(); - }); + builder.spawn(move|| f.invoke(())); }) } #[test] fn test_avoid_copying_the_body_try() { avoid_copying_the_body(|f| { - let _ = try(proc() { - f() - }); + let _ = try(move|| f.invoke(())); }) } @@ -429,24 +441,24 @@ mod test { // (well, it would if the constant were 8000+ - I lowered it to be more // valgrind-friendly. try this at home, instead..!) static GENERATIONS: uint = 16; - fn child_no(x: uint) -> proc(): Send { - return proc() { + fn child_no(x: uint) -> Thunk { + return Thunk::new(move|| { if x < GENERATIONS { - TaskBuilder::new().spawn(child_no(x+1)); + TaskBuilder::new().spawn(move|| child_no(x+1).invoke(())); } - } + }); } - TaskBuilder::new().spawn(child_no(0)); + TaskBuilder::new().spawn(|| child_no(0).invoke(())); } #[test] fn test_simple_newsched_spawn() { - spawn(proc()()) + spawn(move|| ()) } #[test] fn test_try_panic_message_static_str() { - match try(proc() { + match try(move|| { panic!("static string"); }) { Err(e) => { @@ -460,7 +472,7 @@ mod test { #[test] fn test_try_panic_message_owned_str() { - match try(proc() { + match try(move|| { panic!("owned string".to_string()); }) { Err(e) => { @@ -474,7 +486,7 @@ mod test { #[test] fn test_try_panic_message_any() { - match try(proc() { + match try(move|| { panic!(box 413u16 as Box<Any + Send>); }) { Err(e) => { @@ -492,7 +504,7 @@ mod test { fn test_try_panic_message_unit_struct() { struct Juju; - match try(proc() { + match try(move|| { panic!(Juju) }) { Err(ref e) if e.is::<Juju>() => {} @@ -507,7 +519,7 @@ mod test { let stdout = ChanWriter::new(tx); let r = TaskBuilder::new().stdout(box stdout as Box<Writer + Send>) - .try(proc() { + .try(move|| { print!("Hello, world!"); }); assert!(r.is_ok()); @@ -527,7 +539,7 @@ fn task_abort_no_kill_runtime() { use mem; let tb = TaskBuilder::new(); - let rx = tb.try_future(proc() {}); + let rx = tb.try_future(move|| {}); mem::drop(rx); timer::sleep(Duration::milliseconds(1000)); } |
