diff options
| author | bors <bors@rust-lang.org> | 2013-07-31 02:10:24 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-07-31 02:10:24 -0700 |
| commit | 8b7e241e02bb9f82d7b931033afde477d03ff4f2 (patch) | |
| tree | 84f3b2d5a4f8f81f86b7a04ab937d4a0d18a8366 /src/libstd | |
| parent | 8a737b502067b1896686bd1f9df7a1446296d80b (diff) | |
| parent | 33df9fc1d04c224a0c7ecb8d91b75feed75b412c (diff) | |
| download | rust-8b7e241e02bb9f82d7b931033afde477d03ff4f2.tar.gz rust-8b7e241e02bb9f82d7b931033afde477d03ff4f2.zip | |
auto merge of #8139 : brson/rust/rm-old-task-apis, r=pcwalton
This removes a bunch of options from the task builder interface that are irrelevant to the new scheduler and were generally unused anyway. It also bumps the stack size of new scheduler tasks so that there's enough room to run rustc and changes the interface to `Thread` to not implicitly join threads on destruction, but instead require an explicit, and mandatory, call to `join`.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/rt/comm.rs | 21 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 13 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 19 | ||||
| -rw-r--r-- | src/libstd/rt/uv/async.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/uv/net.rs | 12 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 3 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 95 | ||||
| -rw-r--r-- | src/libstd/task/rt.rs | 2 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 37 | ||||
| -rw-r--r-- | src/libstd/unstable/mod.rs | 25 |
13 files changed, 67 insertions, 173 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 6528835c52c..a27ff559b2b 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -773,10 +773,11 @@ mod test { do run_in_newsched_task { let (port, chan) = oneshot::<int>(); let port_cell = Cell::new(port); - let _thread = do spawntask_thread { + let thread = do spawntask_thread { let _p = port_cell.take(); }; let _chan = chan; + thread.join(); } } } @@ -788,13 +789,15 @@ mod test { let (port, chan) = oneshot::<int>(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); - let _thread1 = do spawntask_thread { + let thread1 = do spawntask_thread { let _p = port_cell.take(); }; - let _thread2 = do spawntask_thread { + let thread2 = do spawntask_thread { let c = chan_cell.take(); c.send(1); }; + thread1.join(); + thread2.join(); } } } @@ -806,19 +809,21 @@ mod test { let (port, chan) = oneshot::<int>(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); - let _thread1 = do spawntask_thread { + let thread1 = do spawntask_thread { let port_cell = Cell::new(port_cell.take()); let res = do spawntask_try { port_cell.take().recv(); }; assert!(res.is_err()); }; - let _thread2 = do spawntask_thread { + let thread2 = do spawntask_thread { let chan_cell = Cell::new(chan_cell.take()); do spawntask { chan_cell.take(); } }; + thread1.join(); + thread2.join(); } } } @@ -830,12 +835,14 @@ mod test { let (port, chan) = oneshot::<~int>(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); - let _thread1 = do spawntask_thread { + let thread1 = do spawntask_thread { chan_cell.take().send(~10); }; - let _thread2 = do spawntask_thread { + let thread2 = do spawntask_thread { assert!(port_cell.take().recv() == ~10); }; + thread1.join(); + thread2.join(); } } } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 2ca7d01da49..dc8669b9264 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -346,7 +346,9 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int { } // Wait for schedulers - { let _threads = threads; } + for threads.consume_iter().advance() |thread| { + thread.join(); + } // Return the exit code unsafe { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 33cfd69fcd2..98df38f9b1d 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -901,10 +901,8 @@ mod test { sched.run(); }; - // wait for the end - let _thread1 = normal_thread; - let _thread2 = special_thread; - + normal_thread.join(); + special_thread.join(); } } @@ -1074,16 +1072,19 @@ mod test { sched2.enqueue_task(task2); let sched1_cell = Cell::new(sched1); - let _thread1 = do Thread::start { + let thread1 = do Thread::start { let sched1 = sched1_cell.take(); sched1.run(); }; let sched2_cell = Cell::new(sched2); - let _thread2 = do Thread::start { + let thread2 = do Thread::start { let sched2 = sched2_cell.take(); sched2.run(); }; + + thread1.join(); + thread2.join(); } } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 8cf864b9222..82d4f8fcc04 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -219,7 +219,7 @@ impl Drop for Task { impl Coroutine { pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack + static MIN_STACK_SIZE: uint = 2000000; // XXX: Too much stack let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index feae8b5ffd8..260903cbcbf 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -125,7 +125,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } // Wait for schedulers - let _threads = threads; + for threads.consume_iter().advance() |thread| { + thread.join(); + } } } diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index 98d08c060e0..ed0137d3b0f 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -16,7 +16,8 @@ type raw_thread = libc::c_void; pub struct Thread { main: ~fn(), - raw_thread: *raw_thread + raw_thread: *raw_thread, + joined: bool } impl Thread { @@ -27,18 +28,28 @@ impl Thread { let raw = substart(&main); Thread { main: main, - raw_thread: raw + raw_thread: raw, + joined: false } } + + pub fn join(self) { + assert!(!self.joined); + let mut this = self; + unsafe { rust_raw_thread_join(this.raw_thread); } + this.joined = true; + } } impl Drop for Thread { fn drop(&self) { - unsafe { rust_raw_thread_join_delete(self.raw_thread) } + assert!(self.joined); + unsafe { rust_raw_thread_delete(self.raw_thread) } } } extern { pub unsafe fn rust_raw_thread_start(f: &(~fn())) -> *raw_thread; - pub unsafe fn rust_raw_thread_join_delete(thread: *raw_thread); + pub unsafe fn rust_raw_thread_join(thread: *raw_thread); + pub unsafe fn rust_raw_thread_delete(thread: *raw_thread); } diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs index 47e0a240e45..d0ca38317cb 100644 --- a/src/libstd/rt/uv/async.rs +++ b/src/libstd/rt/uv/async.rs @@ -94,12 +94,13 @@ mod test { let mut loop_ = Loop::new(); let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); let watcher_cell = Cell::new(watcher); - let _thread = do Thread::start { + let thread = do Thread::start { let mut watcher = watcher_cell.take(); watcher.send(); }; loop_.run(); loop_.close(); + thread.join(); } } } diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs index a039f3ab7ed..8ea4a197269 100644 --- a/src/libstd/rt/uv/net.rs +++ b/src/libstd/rt/uv/net.rs @@ -715,7 +715,7 @@ mod test { } } - let _client_thread = do Thread::start { + let client_thread = do Thread::start { rtdebug!("starting client thread"); let mut loop_ = Loop::new(); let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; @@ -739,6 +739,7 @@ mod test { let mut loop_ = loop_; loop_.run(); loop_.close(); + client_thread.join(); } } @@ -790,7 +791,7 @@ mod test { } } - let _client_thread = do Thread::start { + let client_thread = do Thread::start { rtdebug!("starting client thread"); let mut loop_ = Loop::new(); let mut tcp_watcher = { TcpWatcher::new(&mut loop_) }; @@ -814,6 +815,7 @@ mod test { let mut loop_ = loop_; loop_.run(); loop_.close(); + client_thread.join(); } } @@ -855,7 +857,7 @@ mod test { server.close(||{}); } - do Thread::start { + let thread = do Thread::start { let mut loop_ = Loop::new(); let mut client = UdpWatcher::new(&loop_); assert!(client.bind(client_addr).is_ok()); @@ -873,6 +875,7 @@ mod test { loop_.run(); loop_.close(); + thread.join(); } } @@ -914,7 +917,7 @@ mod test { server.close(||{}); } - do Thread::start { + let thread = do Thread::start { let mut loop_ = Loop::new(); let mut client = UdpWatcher::new(&loop_); assert!(client.bind(client_addr).is_ok()); @@ -932,6 +935,7 @@ mod test { loop_.run(); loop_.close(); + thread.join(); } } } diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 5397b5f2c5c..53ccd20186d 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -222,11 +222,12 @@ mod test_remote { }; remote_cell.put_back(remote); } - let _thread = do Thread::start { + let thread = do Thread::start { remote_cell.take().fire(); }; assert!(tube.recv() == 1); + thread.join(); } } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index c26349b220d..6de0c78d00b 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -44,7 +44,6 @@ use result::Result; use result; use rt::{context, OldTaskContext, TaskContext}; use rt::local::Local; -use task::rt::{task_id, sched_id}; use unstable::finally::Finally; use util; @@ -58,18 +57,6 @@ mod local_data_priv; pub mod rt; pub mod spawn; -/// A handle to a scheduler -#[deriving(Eq)] -pub enum Scheduler { - SchedulerHandle(sched_id) -} - -/// A handle to a task -#[deriving(Eq)] -pub enum Task { - TaskHandle(task_id) -} - /** * Indicates the manner in which a task exited. * @@ -92,23 +79,8 @@ pub enum TaskResult { pub enum SchedMode { /// Run task on the default scheduler DefaultScheduler, - /// Run task on the current scheduler - CurrentScheduler, - /// Run task on a specific scheduler - ExistingScheduler(Scheduler), - /** - * Tasks are scheduled on the main OS thread - * - * The main OS thread is the thread used to launch the runtime which, - * in most cases, is the process's initial thread as created by the OS. - */ - PlatformThread, /// All tasks run in the same OS thread SingleThreaded, - /// Tasks are distributed among available CPUs - ThreadPerTask, - /// Tasks are distributed among a fixed number of OS threads - ManualThreads(uint), } /** @@ -118,17 +90,9 @@ pub enum SchedMode { * * * sched_mode - The operating mode of the scheduler * - * * foreign_stack_size - The size of the foreign stack, in bytes - * - * Rust code runs on Rust-specific stacks. When Rust code calls foreign - * code (via functions in foreign modules) it switches to a typical, large - * stack appropriate for running code written in languages like C. By - * default these foreign stacks have unspecified size, but with this - * option their size can be precisely specified. */ pub struct SchedOpts { mode: SchedMode, - foreign_stack_size: Option<uint>, } /** @@ -446,7 +410,6 @@ pub fn default_task_opts() -> TaskOpts { notify_chan: None, sched: SchedOpts { mode: DefaultScheduler, - foreign_stack_size: None } } } @@ -591,18 +554,6 @@ pub fn failing() -> bool { } } -pub fn get_task() -> Task { - //! Get a handle to the running task - - unsafe { - TaskHandle(rt::get_task_id()) - } -} - -pub fn get_scheduler() -> Scheduler { - SchedulerHandle(unsafe { rt::rust_get_sched_id() }) -} - /** * Temporarily make the task unkillable * @@ -935,13 +886,6 @@ fn test_try_fail() { } #[test] -#[should_fail] -#[ignore(cfg(windows))] -fn test_spawn_sched_no_threads() { - do spawn_sched(ManualThreads(0u)) { } -} - -#[test] fn test_spawn_sched() { let (po, ch) = stream::<()>(); let ch = SharedChan::new(ch); @@ -1108,17 +1052,6 @@ fn test_avoid_copying_the_body_unlinked() { } #[test] -fn test_platform_thread() { - let (po, ch) = stream(); - let mut builder = task(); - builder.sched_mode(PlatformThread); - do builder.spawn { - ch.send(()); - } - po.recv(); -} - -#[test] #[ignore(cfg(windows))] #[should_fail] fn test_unkillable() { @@ -1222,34 +1155,6 @@ fn test_child_doesnt_ref_parent() { } #[test] -fn test_spawn_thread_on_demand() { - let (port, chan) = comm::stream(); - - do spawn_sched(ManualThreads(2)) || { - unsafe { - let max_threads = rt::rust_sched_threads(); - assert_eq!(max_threads as int, 2); - let running_threads = rt::rust_sched_current_nonlazy_threads(); - assert_eq!(running_threads as int, 1); - - let (port2, chan2) = comm::stream(); - - do spawn_sched(CurrentScheduler) || { - chan2.send(()); - } - - let running_threads2 = rt::rust_sched_current_nonlazy_threads(); - assert_eq!(running_threads2 as int, 2); - - port2.recv(); - chan.send(()); - } - } - - port.recv(); -} - -#[test] fn test_simple_newsched_spawn() { use rt::test::run_in_newsched_task; diff --git a/src/libstd/task/rt.rs b/src/libstd/task/rt.rs index 3720bc585cc..13c51230dc2 100644 --- a/src/libstd/task/rt.rs +++ b/src/libstd/task/rt.rs @@ -36,8 +36,6 @@ extern { pub fn rust_get_sched_id() -> sched_id; pub fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id; - pub fn rust_sched_threads() -> libc::size_t; - pub fn rust_sched_current_nonlazy_threads() -> libc::size_t; pub fn get_task_id() -> task_id; #[rust_stack] diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 749db307012..54e46826976 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -84,9 +84,8 @@ use local_data; use task::local_data_priv::{local_get, local_set, OldHandle}; use task::rt::rust_task; use task::rt; -use task::{Failure, ManualThreads, PlatformThread, SchedOpts, SingleThreaded}; -use task::{Success, TaskOpts, TaskResult, ThreadPerTask}; -use task::{ExistingScheduler, SchedulerHandle}; +use task::{Failure}; +use task::{Success, TaskOpts, TaskResult}; use task::unkillable; use to_bytes::IterBytes; use uint; @@ -747,7 +746,7 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { // Create child task. let new_task = match opts.sched.mode { DefaultScheduler => rt::new_task(), - _ => new_task_in_sched(opts.sched) + _ => new_task_in_sched() }; assert!(!new_task.is_null()); // Getting killed after here would leak the task. @@ -805,35 +804,9 @@ fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { return result; } - fn new_task_in_sched(opts: SchedOpts) -> *rust_task { - if opts.foreign_stack_size != None { - fail!("foreign_stack_size scheduler option unimplemented"); - } - - let num_threads = match opts.mode { - DefaultScheduler - | CurrentScheduler - | ExistingScheduler(*) - | PlatformThread => 0u, /* Won't be used */ - SingleThreaded => 1u, - ThreadPerTask => { - fail!("ThreadPerTask scheduling mode unimplemented") - } - ManualThreads(threads) => { - if threads == 0u { - fail!("can not create a scheduler with no threads"); - } - threads - } - }; - + fn new_task_in_sched() -> *rust_task { unsafe { - let sched_id = match opts.mode { - CurrentScheduler => rt::rust_get_sched_id(), - ExistingScheduler(SchedulerHandle(id)) => id, - PlatformThread => rt::rust_osmain_sched_id(), - _ => rt::rust_new_sched(num_threads) - }; + let sched_id = rt::rust_new_sched(1); rt::rust_new_task_in_sched(sched_id) } } diff --git a/src/libstd/unstable/mod.rs b/src/libstd/unstable/mod.rs index 0d8cb1e8f74..313567d1248 100644 --- a/src/libstd/unstable/mod.rs +++ b/src/libstd/unstable/mod.rs @@ -12,7 +12,6 @@ use comm::{GenericChan, GenericPort}; use comm; -use libc; use prelude::*; use task; @@ -37,18 +36,16 @@ The executing thread has no access to a task pointer and will be using a normal large stack. */ pub fn run_in_bare_thread(f: ~fn()) { + use cell::Cell; + use rt::thread::Thread; + + let f_cell = Cell::new(f); let (port, chan) = comm::stream(); // FIXME #4525: Unfortunate that this creates an extra scheduler but it's - // necessary since rust_raw_thread_join_delete is blocking + // necessary since rust_raw_thread_join is blocking do task::spawn_sched(task::SingleThreaded) { - unsafe { - let closure: &fn() = || { - f() - }; - let thread = rust_raw_thread_start(&closure); - rust_raw_thread_join_delete(thread); - chan.send(()); - } + Thread::start(f_cell.take()).join(); + chan.send(()); } port.recv(); } @@ -70,14 +67,6 @@ fn test_run_in_bare_thread_exchange() { } } -#[allow(non_camel_case_types)] // runtime type -pub type raw_thread = libc::c_void; - -extern { - fn rust_raw_thread_start(f: &(&fn())) -> *raw_thread; - fn rust_raw_thread_join_delete(thread: *raw_thread); -} - /// Changes the current working directory to the specified /// path while acquiring a global lock, then calls `action`. |
