diff options
| author | bors <bors@rust-lang.org> | 2013-12-26 01:01:54 -0800 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2013-12-26 01:01:54 -0800 |
| commit | 9477c49a7b4eec2c2a3e0d9a28c4129e3d1fa6ec (patch) | |
| tree | bd57f2b50c352a4a63d0ae75ef52419e19ebf994 /src/libstd/rt | |
| parent | d975060de6c944ca12ce5205fbc9fc7726948ae1 (diff) | |
| parent | 6cad8f4f14da1dd529100779db74b03d6db20faf (diff) | |
| download | rust-9477c49a7b4eec2c2a3e0d9a28c4129e3d1fa6ec.tar.gz rust-9477c49a7b4eec2c2a3e0d9a28c4129e3d1fa6ec.zip | |
auto merge of #10965 : alexcrichton/rust/libgreen, r=brson
This pull request extracts all scheduling functionality from libstd, moving it into its own separate crates. The new libnative and libgreen will be the new way in which 1:1 and M:N scheduling is implemented. The standard library still requires an interface to the runtime, however, (think of things like `std::comm` and `io::println`). The interface is now defined by the `Runtime` trait inside of `std::rt`. The booting process is now that libgreen defines the start lang-item and that's it. I want to extend this soon to have libnative also have a "start lang item" but also allow libgreen and libnative to be linked together in the same process. For now though, only libgreen can be used to start a program (unless you define the start lang item yourself). Again though, I want to change this soon, I just figured that this pull request is large enough as-is. This certainly wasn't a smooth transition, certain functionality has no equivalent in this new separation, and some functionality is now better enabled through this new system. I did my best to separate all of the commits by topic and keep things fairly bite-sized, although are indeed larger than others. As a note, this is currently rebased on top of my `std::comm` rewrite (or at least an old copy of it), but none of those commits need reviewing (that will all happen in another pull request).
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/basic.rs | 230 | ||||
| -rw-r--r-- | src/libstd/rt/borrowck.rs | 11 | ||||
| -rw-r--r-- | src/libstd/rt/context.rs | 463 | ||||
| -rw-r--r-- | src/libstd/rt/crate_map.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/deque.rs | 658 | ||||
| -rw-r--r-- | src/libstd/rt/env.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 318 | ||||
| -rw-r--r-- | src/libstd/rt/local.rs | 98 | ||||
| -rw-r--r-- | src/libstd/rt/local_ptr.rs | 8 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 359 | ||||
| -rw-r--r-- | src/libstd/rt/mpmc_bounded_queue.rs | 209 | ||||
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 215 | ||||
| -rw-r--r-- | src/libstd/rt/rc.rs | 139 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 71 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 1395 | ||||
| -rw-r--r-- | src/libstd/rt/sleeper_list.rs | 47 | ||||
| -rw-r--r-- | src/libstd/rt/spsc_queue.rs | 296 | ||||
| -rw-r--r-- | src/libstd/rt/stack.rs | 78 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 800 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 440 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 34 | ||||
| -rw-r--r-- | src/libstd/rt/tube.rs | 170 | ||||
| -rw-r--r-- | src/libstd/rt/unwind.rs | 270 | ||||
| -rw-r--r-- | src/libstd/rt/util.rs | 27 |
24 files changed, 625 insertions, 5715 deletions
diff --git a/src/libstd/rt/basic.rs b/src/libstd/rt/basic.rs deleted file mode 100644 index 3589582357c..00000000000 --- a/src/libstd/rt/basic.rs +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! This is a basic event loop implementation not meant for any "real purposes" -//! other than testing the scheduler and proving that it's possible to have a -//! pluggable event loop. - -use prelude::*; - -use cast; -use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausableIdleCallback, - Callback}; -use unstable::sync::Exclusive; -use io::native; -use util; - -/// This is the only exported function from this module. -pub fn event_loop() -> ~EventLoop { - ~BasicLoop::new() as ~EventLoop -} - -struct BasicLoop { - work: ~[proc()], // pending work - idle: Option<*mut BasicPausable>, // only one is allowed - remotes: ~[(uint, ~Callback)], - next_remote: uint, - messages: Exclusive<~[Message]>, - io: ~IoFactory, -} - -enum Message { RunRemote(uint), RemoveRemote(uint) } - -impl BasicLoop { - fn new() -> BasicLoop { - BasicLoop { - work: ~[], - idle: None, - next_remote: 0, - remotes: ~[], - messages: Exclusive::new(~[]), - io: ~native::IoFactory as ~IoFactory, - } - } - - /// Process everything in the work queue (continually) - fn work(&mut self) { - while self.work.len() > 0 { - for work in util::replace(&mut self.work, ~[]).move_iter() { - work(); - } - } - } - - fn remote_work(&mut self) { - let messages = unsafe { - self.messages.with(|messages| { - if messages.len() > 0 { - Some(util::replace(messages, ~[])) - } else { - None - } - }) - }; - let messages = match messages { - Some(m) => m, None => return - }; - for message in messages.iter() { - self.message(*message); - } - } - - fn message(&mut self, message: Message) { - match message { - RunRemote(i) => { - match self.remotes.mut_iter().find(|& &(id, _)| id == i) { - Some(&(_, ref mut f)) => f.call(), - None => unreachable!() - } - } - RemoveRemote(i) => { - match self.remotes.iter().position(|&(id, _)| id == i) { - Some(i) => { self.remotes.remove(i); } - None => unreachable!() - } - } - } - } - - /// Run the idle callback if one is registered - fn idle(&mut self) { - unsafe { - match self.idle { - Some(idle) => { - if (*idle).active { - (*idle).work.call(); - } - } - None => {} - } - } - } - - fn has_idle(&self) -> bool { - unsafe { self.idle.is_some() && (**self.idle.get_ref()).active } - } -} - -impl EventLoop for BasicLoop { - fn run(&mut self) { - // Not exactly efficient, but it gets the job done. - while self.remotes.len() > 0 || self.work.len() > 0 || self.has_idle() { - - self.work(); - self.remote_work(); - - if self.has_idle() { - self.idle(); - continue - } - - unsafe { - // We block here if we have no messages to process and we may - // receive a message at a later date - self.messages.hold_and_wait(|messages| { - self.remotes.len() > 0 && - messages.len() == 0 && - self.work.len() == 0 - }) - } - } - } - - fn callback(&mut self, f: proc()) { - self.work.push(f); - } - - // XXX: Seems like a really weird requirement to have an event loop provide. - fn pausable_idle_callback(&mut self, cb: ~Callback) -> ~PausableIdleCallback { - let callback = ~BasicPausable::new(self, cb); - rtassert!(self.idle.is_none()); - unsafe { - let cb_ptr: &*mut BasicPausable = cast::transmute(&callback); - self.idle = Some(*cb_ptr); - } - return callback as ~PausableIdleCallback; - } - - fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback { - let id = self.next_remote; - self.next_remote += 1; - self.remotes.push((id, f)); - ~BasicRemote::new(self.messages.clone(), id) as ~RemoteCallback - } - - fn io<'a>(&'a mut self) -> Option<&'a mut IoFactory> { - let factory: &mut IoFactory = self.io; - Some(factory) - } -} - -struct BasicRemote { - queue: Exclusive<~[Message]>, - id: uint, -} - -impl BasicRemote { - fn new(queue: Exclusive<~[Message]>, id: uint) -> BasicRemote { - BasicRemote { queue: queue, id: id } - } -} - -impl RemoteCallback for BasicRemote { - fn fire(&mut self) { - unsafe { - self.queue.hold_and_signal(|queue| { - queue.push(RunRemote(self.id)); - }) - } - } -} - -impl Drop for BasicRemote { - fn drop(&mut self) { - unsafe { - self.queue.hold_and_signal(|queue| { - queue.push(RemoveRemote(self.id)); - }) - } - } -} - -struct BasicPausable { - eloop: *mut BasicLoop, - work: ~Callback, - active: bool, -} - -impl BasicPausable { - fn new(eloop: &mut BasicLoop, cb: ~Callback) -> BasicPausable { - BasicPausable { - active: false, - work: cb, - eloop: eloop, - } - } -} - -impl PausableIdleCallback for BasicPausable { - fn pause(&mut self) { - self.active = false; - } - fn resume(&mut self) { - self.active = true; - } -} - -impl Drop for BasicPausable { - fn drop(&mut self) { - unsafe { - (*self.eloop).idle = None; - } - } -} diff --git a/src/libstd/rt/borrowck.rs b/src/libstd/rt/borrowck.rs index 423981d9e91..d1e97cb6ec0 100644 --- a/src/libstd/rt/borrowck.rs +++ b/src/libstd/rt/borrowck.rs @@ -12,9 +12,8 @@ use c_str::{ToCStr, CString}; use libc::{c_char, size_t}; use option::{Option, None, Some}; use ptr::RawPtr; -use rt::env; +use rt; use rt::local::Local; -use rt::task; use rt::task::Task; use str::OwnedStr; use str; @@ -62,7 +61,7 @@ unsafe fn fail_borrowed(alloc: *mut raw::Box<()>, file: *c_char, line: size_t) match try_take_task_borrow_list() { None => { // not recording borrows let msg = "borrowed"; - msg.with_c_str(|msg_p| task::begin_unwind_raw(msg_p, file, line)) + msg.with_c_str(|msg_p| rt::begin_unwind_raw(msg_p, file, line)) } Some(borrow_list) => { // recording borrows let mut msg = ~"borrowed"; @@ -76,7 +75,7 @@ unsafe fn fail_borrowed(alloc: *mut raw::Box<()>, file: *c_char, line: size_t) sep = " and at "; } } - msg.with_c_str(|msg_p| task::begin_unwind_raw(msg_p, file, line)) + msg.with_c_str(|msg_p| rt::begin_unwind_raw(msg_p, file, line)) } } } @@ -95,7 +94,7 @@ unsafe fn debug_borrow<T,P:RawPtr<T>>(tag: &'static str, //! A useful debugging function that prints a pointer + tag + newline //! without allocating memory. - if ENABLE_DEBUG && env::debug_borrow() { + if ENABLE_DEBUG && rt::env::debug_borrow() { debug_borrow_slow(tag, p, old_bits, new_bits, filename, line); } @@ -180,7 +179,7 @@ pub unsafe fn unrecord_borrow(a: *u8, if br.alloc != a || br.file != file || br.line != line { let err = format!("wrong borrow found, br={:?}", br); err.with_c_str(|msg_p| { - task::begin_unwind_raw(msg_p, file, line) + rt::begin_unwind_raw(msg_p, file, line) }) } borrow_list diff --git a/src/libstd/rt/context.rs b/src/libstd/rt/context.rs deleted file mode 100644 index 31cf0696881..00000000000 --- a/src/libstd/rt/context.rs +++ /dev/null @@ -1,463 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use option::*; -use super::stack::StackSegment; -use libc::c_void; -use uint; -use cast::{transmute, transmute_mut_unsafe, - transmute_region, transmute_mut_region}; - -pub static RED_ZONE: uint = 20 * 1024; - -// FIXME #7761: Registers is boxed so that it is 16-byte aligned, for storing -// SSE regs. It would be marginally better not to do this. In C++ we -// use an attribute on a struct. -// FIXME #7761: It would be nice to define regs as `~Option<Registers>` since -// the registers are sometimes empty, but the discriminant would -// then misalign the regs again. -pub struct Context { - /// The context entry point, saved here for later destruction - priv start: Option<~proc()>, - /// Hold the registers while the task or scheduler is suspended - priv regs: ~Registers, - /// Lower bound and upper bound for the stack - priv stack_bounds: Option<(uint, uint)>, -} - -impl Context { - pub fn empty() -> Context { - Context { - start: None, - regs: new_regs(), - stack_bounds: None, - } - } - - /// Create a new context that will resume execution by running proc() - pub fn new(start: proc(), stack: &mut StackSegment) -> Context { - // FIXME #7767: Putting main into a ~ so it's a thin pointer and can - // be passed to the spawn function. Another unfortunate - // allocation - let start = ~start; - - // The C-ABI function that is the task entry point - extern fn task_start_wrapper(f: &proc()) { - // XXX(pcwalton): This may be sketchy. - unsafe { - let f: &|| = transmute(f); - (*f)() - } - } - - let fp: *c_void = task_start_wrapper as *c_void; - let argp: *c_void = unsafe { transmute::<&proc(), *c_void>(&*start) }; - let sp: *uint = stack.end(); - let sp: *mut uint = unsafe { transmute_mut_unsafe(sp) }; - // Save and then immediately load the current context, - // which we will then modify to call the given function when restored - let mut regs = new_regs(); - unsafe { - rust_swap_registers(transmute_mut_region(&mut *regs), transmute_region(&*regs)); - }; - - initialize_call_frame(&mut *regs, fp, argp, sp); - - // Scheduler tasks don't have a stack in the "we allocated it" sense, - // but rather they run on pthreads stacks. We have complete control over - // them in terms of the code running on them (and hopefully they don't - // overflow). Additionally, their coroutine stacks are listed as being - // zero-length, so that's how we detect what's what here. - let stack_base: *uint = stack.start(); - let bounds = if sp as uint == stack_base as uint { - None - } else { - Some((stack_base as uint, sp as uint)) - }; - return Context { - start: Some(start), - regs: regs, - stack_bounds: bounds, - } - } - - /* Switch contexts - - Suspend the current execution context and resume another by - saving the registers values of the executing thread to a Context - then loading the registers from a previously saved Context. - */ - pub fn swap(out_context: &mut Context, in_context: &Context) { - rtdebug!("swapping contexts"); - let out_regs: &mut Registers = match out_context { - &Context { regs: ~ref mut r, .. } => r - }; - let in_regs: &Registers = match in_context { - &Context { regs: ~ref r, .. } => r - }; - - rtdebug!("noting the stack limit and doing raw swap"); - - unsafe { - // Right before we switch to the new context, set the new context's - // stack limit in the OS-specified TLS slot. This also means that - // we cannot call any more rust functions after record_stack_bounds - // returns because they would all likely fail due to the limit being - // invalid for the current task. Lucky for us `rust_swap_registers` - // is a C function so we don't have to worry about that! - match in_context.stack_bounds { - Some((lo, hi)) => record_stack_bounds(lo, hi), - // If we're going back to one of the original contexts or - // something that's possibly not a "normal task", then reset - // the stack limit to 0 to make morestack never fail - None => record_stack_bounds(0, uint::max_value), - } - rust_swap_registers(out_regs, in_regs) - } - } -} - -extern { - fn rust_swap_registers(out_regs: *mut Registers, in_regs: *Registers); -} - -// Register contexts used in various architectures -// -// These structures all represent a context of one task throughout its -// execution. Each struct is a representation of the architecture's register -// set. When swapping between tasks, these register sets are used to save off -// the current registers into one struct, and load them all from another. -// -// Note that this is only used for context switching, which means that some of -// the registers may go unused. For example, for architectures with -// callee/caller saved registers, the context will only reflect the callee-saved -// registers. This is because the caller saved registers are already stored -// elsewhere on the stack (if it was necessary anyway). -// -// Additionally, there may be fields on various architectures which are unused -// entirely because they only reflect what is theoretically possible for a -// "complete register set" to show, but user-space cannot alter these registers. -// An example of this would be the segment selectors for x86. -// -// These structures/functions are roughly in-sync with the source files inside -// of src/rt/arch/$arch. The only currently used function from those folders is -// the `rust_swap_registers` function, but that's only because for now segmented -// stacks are disabled. - -#[cfg(target_arch = "x86")] -struct Registers { - eax: u32, ebx: u32, ecx: u32, edx: u32, - ebp: u32, esi: u32, edi: u32, esp: u32, - cs: u16, ds: u16, ss: u16, es: u16, fs: u16, gs: u16, - eflags: u32, eip: u32 -} - -#[cfg(target_arch = "x86")] -fn new_regs() -> ~Registers { - ~Registers { - eax: 0, ebx: 0, ecx: 0, edx: 0, - ebp: 0, esi: 0, edi: 0, esp: 0, - cs: 0, ds: 0, ss: 0, es: 0, fs: 0, gs: 0, - eflags: 0, eip: 0 - } -} - -#[cfg(target_arch = "x86")] -fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, - sp: *mut uint) { - - let sp = align_down(sp); - let sp = mut_offset(sp, -4); - - unsafe { *sp = arg as uint }; - let sp = mut_offset(sp, -1); - unsafe { *sp = 0 }; // The final return address - - regs.esp = sp as u32; - regs.eip = fptr as u32; - - // Last base pointer on the stack is 0 - regs.ebp = 0; -} - -// windows requires saving more registers (both general and XMM), so the windows -// register context must be larger. -#[cfg(windows, target_arch = "x86_64")] -type Registers = [uint, ..34]; -#[cfg(not(windows), target_arch = "x86_64")] -type Registers = [uint, ..22]; - -#[cfg(windows, target_arch = "x86_64")] -fn new_regs() -> ~Registers { ~([0, .. 34]) } -#[cfg(not(windows), target_arch = "x86_64")] -fn new_regs() -> ~Registers { ~([0, .. 22]) } - -#[cfg(target_arch = "x86_64")] -fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, - sp: *mut uint) { - - // Redefinitions from rt/arch/x86_64/regs.h - static RUSTRT_ARG0: uint = 3; - static RUSTRT_RSP: uint = 1; - static RUSTRT_IP: uint = 8; - static RUSTRT_RBP: uint = 2; - - let sp = align_down(sp); - let sp = mut_offset(sp, -1); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - rtdebug!("creating call frame"); - rtdebug!("fptr {}", fptr); - rtdebug!("arg {}", arg); - rtdebug!("sp {}", sp); - - regs[RUSTRT_ARG0] = arg as uint; - regs[RUSTRT_RSP] = sp as uint; - regs[RUSTRT_IP] = fptr as uint; - - // Last base pointer on the stack should be 0 - regs[RUSTRT_RBP] = 0; -} - -#[cfg(target_arch = "arm")] -type Registers = [uint, ..32]; - -#[cfg(target_arch = "arm")] -fn new_regs() -> ~Registers { ~([0, .. 32]) } - -#[cfg(target_arch = "arm")] -fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, - sp: *mut uint) { - let sp = align_down(sp); - // sp of arm eabi is 8-byte aligned - let sp = mut_offset(sp, -2); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - regs[0] = arg as uint; // r0 - regs[13] = sp as uint; // #53 sp, r13 - regs[14] = fptr as uint; // #60 pc, r15 --> lr -} - -#[cfg(target_arch = "mips")] -type Registers = [uint, ..32]; - -#[cfg(target_arch = "mips")] -fn new_regs() -> ~Registers { ~([0, .. 32]) } - -#[cfg(target_arch = "mips")] -fn initialize_call_frame(regs: &mut Registers, fptr: *c_void, arg: *c_void, - sp: *mut uint) { - let sp = align_down(sp); - // sp of mips o32 is 8-byte aligned - let sp = mut_offset(sp, -2); - - // The final return address. 0 indicates the bottom of the stack - unsafe { *sp = 0; } - - regs[4] = arg as uint; - regs[29] = sp as uint; - regs[25] = fptr as uint; - regs[31] = fptr as uint; -} - -fn align_down(sp: *mut uint) -> *mut uint { - unsafe { - let sp: uint = transmute(sp); - let sp = sp & !(16 - 1); - transmute::<uint, *mut uint>(sp) - } -} - -// ptr::mut_offset is positive ints only -#[inline] -pub fn mut_offset<T>(ptr: *mut T, count: int) -> *mut T { - use mem::size_of; - (ptr as int + count * (size_of::<T>() as int)) as *mut T -} - -#[inline(always)] -pub unsafe fn record_stack_bounds(stack_lo: uint, stack_hi: uint) { - // When the old runtime had segmented stacks, it used a calculation that was - // "limit + RED_ZONE + FUDGE". The red zone was for things like dynamic - // symbol resolution, llvm function calls, etc. In theory this red zone - // value is 0, but it matters far less when we have gigantic stacks because - // we don't need to be so exact about our stack budget. The "fudge factor" - // was because LLVM doesn't emit a stack check for functions < 256 bytes in - // size. Again though, we have giant stacks, so we round all these - // calculations up to the nice round number of 20k. - record_sp_limit(stack_lo + RED_ZONE); - - return target_record_stack_bounds(stack_lo, stack_hi); - - #[cfg(not(windows))] #[cfg(not(target_arch = "x86_64"))] #[inline(always)] - unsafe fn target_record_stack_bounds(_stack_lo: uint, _stack_hi: uint) {} - #[cfg(windows, target_arch = "x86_64")] #[inline(always)] - unsafe fn target_record_stack_bounds(stack_lo: uint, stack_hi: uint) { - // Windows compiles C functions which may check the stack bounds. This - // means that if we want to perform valid FFI on windows, then we need - // to ensure that the stack bounds are what they truly are for this - // task. More info can be found at: - // https://github.com/mozilla/rust/issues/3445#issuecomment-26114839 - // - // stack range is at TIB: %gs:0x08 (top) and %gs:0x10 (bottom) - asm!("mov $0, %gs:0x08" :: "r"(stack_hi) :: "volatile"); - asm!("mov $0, %gs:0x10" :: "r"(stack_lo) :: "volatile"); - } -} - -/// Records the current limit of the stack as specified by `end`. -/// -/// This is stored in an OS-dependent location, likely inside of the thread -/// local storage. The location that the limit is stored is a pre-ordained -/// location because it's where LLVM has emitted code to check. -/// -/// Note that this cannot be called under normal circumstances. This function is -/// changing the stack limit, so upon returning any further function calls will -/// possibly be triggering the morestack logic if you're not careful. -/// -/// Also note that this and all of the inside functions are all flagged as -/// "inline(always)" because they're messing around with the stack limits. This -/// would be unfortunate for the functions themselves to trigger a morestack -/// invocation (if they were an actual function call). -#[inline(always)] -pub unsafe fn record_sp_limit(limit: uint) { - return target_record_sp_limit(limit); - - // x86-64 - #[cfg(target_arch = "x86_64", target_os = "macos")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - asm!("movq $$0x60+90*8, %rsi - movq $0, %gs:(%rsi)" :: "r"(limit) : "rsi" : "volatile") - } - #[cfg(target_arch = "x86_64", target_os = "linux")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - asm!("movq $0, %fs:112" :: "r"(limit) :: "volatile") - } - #[cfg(target_arch = "x86_64", target_os = "win32")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - // see: http://en.wikipedia.org/wiki/Win32_Thread_Information_Block - // store this inside of the "arbitrary data slot", but double the size - // because this is 64 bit instead of 32 bit - asm!("movq $0, %gs:0x28" :: "r"(limit) :: "volatile") - } - #[cfg(target_arch = "x86_64", target_os = "freebsd")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - asm!("movq $0, %fs:24" :: "r"(limit) :: "volatile") - } - - // x86 - #[cfg(target_arch = "x86", target_os = "macos")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - asm!("movl $$0x48+90*4, %eax - movl $0, %gs:(%eax)" :: "r"(limit) : "eax" : "volatile") - } - #[cfg(target_arch = "x86", target_os = "linux")] - #[cfg(target_arch = "x86", target_os = "freebsd")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - asm!("movl $0, %gs:48" :: "r"(limit) :: "volatile") - } - #[cfg(target_arch = "x86", target_os = "win32")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - // see: http://en.wikipedia.org/wiki/Win32_Thread_Information_Block - // store this inside of the "arbitrary data slot" - asm!("movl $0, %fs:0x14" :: "r"(limit) :: "volatile") - } - - // mips, arm - Some brave soul can port these to inline asm, but it's over - // my head personally - #[cfg(target_arch = "mips")] - #[cfg(target_arch = "arm")] #[inline(always)] - unsafe fn target_record_sp_limit(limit: uint) { - return record_sp_limit(limit as *c_void); - extern { - fn record_sp_limit(limit: *c_void); - } - } -} - -/// The counterpart of the function above, this function will fetch the current -/// stack limit stored in TLS. -/// -/// Note that all of these functions are meant to be exact counterparts of their -/// brethren above, except that the operands are reversed. -/// -/// As with the setter, this function does not have a __morestack header and can -/// therefore be called in a "we're out of stack" situation. -#[inline(always)] -// currently only called by `rust_stack_exhausted`, which doesn't -// exist in a test build. -#[cfg(not(test))] -pub unsafe fn get_sp_limit() -> uint { - return target_get_sp_limit(); - - // x86-64 - #[cfg(target_arch = "x86_64", target_os = "macos")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movq $$0x60+90*8, %rsi - movq %gs:(%rsi), $0" : "=r"(limit) :: "rsi" : "volatile"); - return limit; - } - #[cfg(target_arch = "x86_64", target_os = "linux")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movq %fs:112, $0" : "=r"(limit) ::: "volatile"); - return limit; - } - #[cfg(target_arch = "x86_64", target_os = "win32")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movq %gs:0x28, $0" : "=r"(limit) ::: "volatile"); - return limit; - } - #[cfg(target_arch = "x86_64", target_os = "freebsd")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movq %fs:24, $0" : "=r"(limit) ::: "volatile"); - return limit; - } - - // x86 - #[cfg(target_arch = "x86", target_os = "macos")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movl $$0x48+90*4, %eax - movl %gs:(%eax), $0" : "=r"(limit) :: "eax" : "volatile"); - return limit; - } - #[cfg(target_arch = "x86", target_os = "linux")] - #[cfg(target_arch = "x86", target_os = "freebsd")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movl %gs:48, $0" : "=r"(limit) ::: "volatile"); - return limit; - } - #[cfg(target_arch = "x86", target_os = "win32")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - let limit; - asm!("movl %fs:0x14, $0" : "=r"(limit) ::: "volatile"); - return limit; - } - - // mips, arm - Some brave soul can port these to inline asm, but it's over - // my head personally - #[cfg(target_arch = "mips")] - #[cfg(target_arch = "arm")] #[inline(always)] - unsafe fn target_get_sp_limit() -> uint { - return get_sp_limit() as uint; - extern { - fn get_sp_limit() -> *c_void; - } - } -} diff --git a/src/libstd/rt/crate_map.rs b/src/libstd/rt/crate_map.rs index 22fc3f0ab56..d9b40cfbb6e 100644 --- a/src/libstd/rt/crate_map.rs +++ b/src/libstd/rt/crate_map.rs @@ -30,7 +30,7 @@ pub struct CrateMap<'a> { version: i32, entries: &'a [ModEntry<'a>], children: &'a [&'a CrateMap<'a>], - event_loop_factory: Option<extern "C" fn() -> ~EventLoop>, + event_loop_factory: Option<fn() -> ~EventLoop>, } #[cfg(not(windows))] diff --git a/src/libstd/rt/deque.rs b/src/libstd/rt/deque.rs deleted file mode 100644 index 770fc9ffa12..00000000000 --- a/src/libstd/rt/deque.rs +++ /dev/null @@ -1,658 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A (mostly) lock-free concurrent work-stealing deque -//! -//! This module contains an implementation of the Chase-Lev work stealing deque -//! described in "Dynamic Circular Work-Stealing Deque". The implementation is -//! heavily based on the pseudocode found in the paper. -//! -//! This implementation does not want to have the restriction of a garbage -//! collector for reclamation of buffers, and instead it uses a shared pool of -//! buffers. This shared pool is required for correctness in this -//! implementation. -//! -//! The only lock-synchronized portions of this deque are the buffer allocation -//! and deallocation portions. Otherwise all operations are lock-free. -//! -//! # Example -//! -//! use std::rt::deque::BufferPool; -//! -//! let mut pool = BufferPool::new(); -//! let (mut worker, mut stealer) = pool.deque(); -//! -//! // Only the worker may push/pop -//! worker.push(1); -//! worker.pop(); -//! -//! // Stealers take data from the other end of the deque -//! worker.push(1); -//! stealer.steal(); -//! -//! // Stealers can be cloned to have many stealers stealing in parallel -//! worker.push(1); -//! let mut stealer2 = stealer.clone(); -//! stealer2.steal(); - -// NB: the "buffer pool" strategy is not done for speed, but rather for -// correctness. For more info, see the comment on `swap_buffer` - -// XXX: all atomic operations in this module use a SeqCst ordering. That is -// probably overkill - -use cast; -use clone::Clone; -use iter::range; -use kinds::Send; -use libc; -use mem; -use ops::Drop; -use option::{Option, Some, None}; -use ptr; -use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use unstable::sync::{UnsafeArc, Exclusive}; - -// Once the queue is less than 1/K full, then it will be downsized. Note that -// the deque requires that this number be less than 2. -static K: int = 4; - -// Minimum number of bits that a buffer size should be. No buffer will resize to -// under this value, and all deques will initially contain a buffer of this -// size. -// -// The size in question is 1 << MIN_BITS -static MIN_BITS: int = 7; - -struct Deque<T> { - bottom: AtomicInt, - top: AtomicInt, - array: AtomicPtr<Buffer<T>>, - pool: BufferPool<T>, -} - -/// Worker half of the work-stealing deque. This worker has exclusive access to -/// one side of the deque, and uses `push` and `pop` method to manipulate it. -/// -/// There may only be one worker per deque. -pub struct Worker<T> { - priv deque: UnsafeArc<Deque<T>>, -} - -/// The stealing half of the work-stealing deque. Stealers have access to the -/// opposite end of the deque from the worker, and they only have access to the -/// `steal` method. -pub struct Stealer<T> { - priv deque: UnsafeArc<Deque<T>>, -} - -/// When stealing some data, this is an enumeration of the possible outcomes. -#[deriving(Eq)] -pub enum Stolen<T> { - /// The deque was empty at the time of stealing - Empty, - /// The stealer lost the race for stealing data, and a retry may return more - /// data. - Abort, - /// The stealer has successfully stolen some data. - Data(T), -} - -/// The allocation pool for buffers used by work-stealing deques. Right now this -/// structure is used for reclamation of memory after it is no longer in use by -/// deques. -/// -/// This data structure is protected by a mutex, but it is rarely used. Deques -/// will only use this structure when allocating a new buffer or deallocating a -/// previous one. -pub struct BufferPool<T> { - priv pool: Exclusive<~[~Buffer<T>]>, -} - -/// An internal buffer used by the chase-lev deque. This structure is actually -/// implemented as a circular buffer, and is used as the intermediate storage of -/// the data in the deque. -/// -/// This type is implemented with *T instead of ~[T] for two reasons: -/// -/// 1. There is nothing safe about using this buffer. This easily allows the -/// same value to be read twice in to rust, and there is nothing to -/// prevent this. The usage by the deque must ensure that one of the -/// values is forgotten. Furthermore, we only ever want to manually run -/// destructors for values in this buffer (on drop) because the bounds -/// are defined by the deque it's owned by. -/// -/// 2. We can certainly avoid bounds checks using *T instead of ~[T], although -/// LLVM is probably pretty good at doing this already. -struct Buffer<T> { - storage: *T, - log_size: int, -} - -impl<T: Send> BufferPool<T> { - /// Allocates a new buffer pool which in turn can be used to allocate new - /// deques. - pub fn new() -> BufferPool<T> { - BufferPool { pool: Exclusive::new(~[]) } - } - - /// Allocates a new work-stealing deque which will send/receiving memory to - /// and from this buffer pool. - pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) { - let (a, b) = UnsafeArc::new2(Deque::new(self.clone())); - (Worker { deque: a }, Stealer { deque: b }) - } - - fn alloc(&mut self, bits: int) -> ~Buffer<T> { - unsafe { - self.pool.with(|pool| { - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i), - None => ~Buffer::new(bits) - } - }) - } - } - - fn free(&mut self, buf: ~Buffer<T>) { - unsafe { - let mut buf = Some(buf); - self.pool.with(|pool| { - let buf = buf.take_unwrap(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - }) - } - } -} - -impl<T: Send> Clone for BufferPool<T> { - fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } -} - -impl<T: Send> Worker<T> { - /// Pushes data onto the front of this work queue. - pub fn push(&mut self, t: T) { - unsafe { (*self.deque.get()).push(t) } - } - /// Pops data off the front of the work queue, returning `None` on an empty - /// queue. - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.deque.get()).pop() } - } - - /// Gets access to the buffer pool that this worker is attached to. This can - /// be used to create more deques which share the same buffer pool as this - /// deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } - } -} - -impl<T: Send> Stealer<T> { - /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&mut self) -> Stolen<T> { - unsafe { (*self.deque.get()).steal() } - } - - /// Gets access to the buffer pool that this stealer is attached to. This - /// can be used to create more deques which share the same buffer pool as - /// this deque. - pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> { - unsafe { &mut (*self.deque.get()).pool } - } -} - -impl<T: Send> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } } -} - -// Almost all of this code can be found directly in the paper so I'm not -// personally going to heavily comment what's going on here. - -impl<T: Send> Deque<T> { - fn new(mut pool: BufferPool<T>) -> Deque<T> { - let buf = pool.alloc(MIN_BITS); - Deque { - bottom: AtomicInt::new(0), - top: AtomicInt::new(0), - array: AtomicPtr::new(unsafe { cast::transmute(buf) }), - pool: pool, - } - } - - unsafe fn push(&mut self, data: T) { - let mut b = self.bottom.load(SeqCst); - let t = self.top.load(SeqCst); - let mut a = self.array.load(SeqCst); - let size = b - t; - if size >= (*a).size() - 1 { - // You won't find this code in the chase-lev deque paper. This is - // alluded to in a small footnote, however. We always free a buffer - // when growing in order to prevent leaks. - a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); - b = self.bottom.load(SeqCst); - } - (*a).put(b, data); - self.bottom.store(b + 1, SeqCst); - } - - unsafe fn pop(&mut self) -> Option<T> { - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let b = b - 1; - self.bottom.store(b, SeqCst); - let t = self.top.load(SeqCst); - let size = b - t; - if size < 0 { - self.bottom.store(t, SeqCst); - return None; - } - let data = (*a).get(b); - if size > 0 { - self.maybe_shrink(b, t); - return Some(data); - } - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - self.bottom.store(t + 1, SeqCst); - return Some(data); - } else { - self.bottom.store(t + 1, SeqCst); - cast::forget(data); // someone else stole this value - return None; - } - } - - unsafe fn steal(&mut self) -> Stolen<T> { - let t = self.top.load(SeqCst); - let old = self.array.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let size = b - t; - if size <= 0 { return Empty } - if size % (*a).size() == 0 { - if a == old && t == self.top.load(SeqCst) { - return Empty - } - return Abort - } - let data = (*a).get(t); - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - Data(data) - } else { - cast::forget(data); // someone else stole this value - Abort - } - } - - unsafe fn maybe_shrink(&mut self, b: int, t: int) { - let a = self.array.load(SeqCst); - if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { - self.swap_buffer(b, a, (*a).resize(b, t, -1)); - } - } - - // Helper routine not mentioned in the paper which is used in growing and - // shrinking buffers to swap in a new buffer into place. As a bit of a - // recap, the whole point that we need a buffer pool rather than just - // calling malloc/free directly is that stealers can continue using buffers - // after this method has called 'free' on it. The continued usage is simply - // a read followed by a forget, but we must make sure that the memory can - // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>, - buf: Buffer<T>) -> *mut Buffer<T> { - let newbuf: *mut Buffer<T> = cast::transmute(~buf); - self.array.store(newbuf, SeqCst); - let ss = (*newbuf).size(); - self.bottom.store(b + ss, SeqCst); - let t = self.top.load(SeqCst); - if self.top.compare_and_swap(t, t + ss, SeqCst) != t { - self.bottom.store(b, SeqCst); - } - self.pool.free(cast::transmute(old)); - return newbuf; - } -} - - -#[unsafe_destructor] -impl<T: Send> Drop for Deque<T> { - fn drop(&mut self) { - let t = self.top.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - // Free whatever is leftover in the dequeue, and then move the buffer - // back into the pool. - for i in range(t, b) { - let _: T = unsafe { (*a).get(i) }; - } - self.pool.free(unsafe { cast::transmute(a) }); - } -} - -impl<T: Send> Buffer<T> { - unsafe fn new(log_size: int) -> Buffer<T> { - let size = (1 << log_size) * mem::size_of::<T>(); - let buffer = libc::malloc(size as libc::size_t); - assert!(!buffer.is_null()); - Buffer { - storage: buffer as *T, - log_size: log_size, - } - } - - fn size(&self) -> int { 1 << self.log_size } - - // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly - fn mask(&self) -> int { (1 << self.log_size) - 1 } - - // This does not protect against loading duplicate values of the same cell, - // nor does this clear out the contents contained within. Hence, this is a - // very unsafe method which the caller needs to treat specially in case a - // race is lost. - unsafe fn get(&self, i: int) -> T { - ptr::read_ptr(self.storage.offset(i & self.mask())) - } - - // Unsafe because this unsafely overwrites possibly uninitialized or - // initialized data. - unsafe fn put(&mut self, i: int, t: T) { - let ptr = self.storage.offset(i & self.mask()); - ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1); - cast::forget(t); - } - - // Again, unsafe because this has incredibly dubious ownership violations. - // It is assumed that this buffer is immediately dropped. - unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { - let mut buf = Buffer::new(self.log_size + delta); - for i in range(t, b) { - buf.put(i, self.get(i)); - } - return buf; - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Buffer<T> { - fn drop(&mut self) { - // It is assumed that all buffers are empty on drop. - unsafe { libc::free(self.storage as *libc::c_void) } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; - - use cast; - use rt::thread::Thread; - use rand; - use rand::Rng; - use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; - use vec; - - #[test] - fn smoke() { - let mut pool = BufferPool::new(); - let (mut w, mut s) = pool.deque(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - w.push(1); - assert_eq!(w.pop(), Some(1)); - w.push(1); - assert_eq!(s.steal(), Data(1)); - w.push(1); - assert_eq!(s.clone().steal(), Data(1)); - } - - #[test] - fn stealpush() { - static AMT: int = 100000; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); - let t = do Thread::start { - let mut s = s; - let mut left = AMT; - while left > 0 { - match s.steal() { - Data(i) => { - assert_eq!(i, 1); - left -= 1; - } - Abort | Empty => {} - } - } - }; - - for _ in range(0, AMT) { - w.push(1); - } - - t.join(); - } - - #[test] - fn stealpush_large() { - static AMT: int = 100000; - let mut pool = BufferPool::<(int, int)>::new(); - let (mut w, s) = pool.deque(); - let t = do Thread::start { - let mut s = s; - let mut left = AMT; - while left > 0 { - match s.steal() { - Data((1, 10)) => { left -= 1; } - Data(..) => fail!(), - Abort | Empty => {} - } - } - }; - - for _ in range(0, AMT) { - w.push((1, 10)); - } - - t.join(); - } - - fn stampede(mut w: Worker<~int>, s: Stealer<~int>, - nthreads: int, amt: uint) { - for _ in range(0, amt) { - w.push(~20); - } - let mut remaining = AtomicUint::new(amt); - let unsafe_remaining: *mut AtomicUint = &mut remaining; - - let threads = range(0, nthreads).map(|_| { - let s = s.clone(); - do Thread::start { - unsafe { - let mut s = s; - while (*unsafe_remaining).load(SeqCst) > 0 { - match s.steal() { - Data(~20) => { - (*unsafe_remaining).fetch_sub(1, SeqCst); - } - Data(..) => fail!(), - Abort | Empty => {} - } - } - } - } - }).to_owned_vec(); - - while remaining.load(SeqCst) > 0 { - match w.pop() { - Some(~20) => { remaining.fetch_sub(1, SeqCst); } - Some(..) => fail!(), - None => {} - } - } - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn run_stampede() { - let mut pool = BufferPool::<~int>::new(); - let (w, s) = pool.deque(); - stampede(w, s, 8, 10000); - } - - #[test] - fn many_stampede() { - static AMT: uint = 4; - let mut pool = BufferPool::<~int>::new(); - let threads = range(0, AMT).map(|_| { - let (w, s) = pool.deque(); - do Thread::start { - stampede(w, s, 4, 10000); - } - }).to_owned_vec(); - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn stress() { - static AMT: int = 100000; - static NTHREADS: int = 8; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let mut pool = BufferPool::<int>::new(); - let (mut w, s) = pool.deque(); - - let threads = range(0, NTHREADS).map(|_| { - let s = s.clone(); - do Thread::start { - unsafe { - let mut s = s; - loop { - match s.steal() { - Data(2) => { HITS.fetch_add(1, SeqCst); } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - } - }).to_owned_vec(); - - let mut rng = rand::task_rng(); - let mut expected = 0; - while expected < AMT { - if rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } else { - expected += 1; - w.push(2); - } - } - - unsafe { - while HITS.load(SeqCst) < AMT as uint { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } - DONE.store(true, SeqCst); - } - - for thread in threads.move_iter() { - thread.join(); - } - - assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); - } - - #[test] - #[ignore(cfg(windows))] // apparently windows scheduling is weird? - fn no_starvation() { - static AMT: int = 10000; - static NTHREADS: int = 4; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let mut pool = BufferPool::<(int, uint)>::new(); - let (mut w, s) = pool.deque(); - - let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { - let s = s.clone(); - let unique_box = ~AtomicUint::new(0); - let thread_box = unsafe { - *cast::transmute::<&~AtomicUint,**mut AtomicUint>(&unique_box) - }; - (do Thread::start { - unsafe { - let mut s = s; - loop { - match s.steal() { - Data((1, 2)) => { - (*thread_box).fetch_add(1, SeqCst); - } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }, unique_box) - })); - - let mut rng = rand::task_rng(); - let mut myhit = false; - let mut iter = 0; - 'outer: loop { - for _ in range(0, rng.gen_range(0, AMT)) { - if !myhit && rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some((1, 2)) => myhit = true, - Some(_) => fail!(), - } - } else { - w.push((1, 2)); - } - } - iter += 1; - - debug!("loop iteration {}", iter); - for (i, slot) in hits.iter().enumerate() { - let amt = slot.load(SeqCst); - debug!("thread {}: {}", i, amt); - if amt == 0 { continue 'outer; } - } - if myhit { - break - } - } - - unsafe { DONE.store(true, SeqCst); } - - for thread in threads.move_iter() { - thread.join(); - } - } -} - diff --git a/src/libstd/rt/env.rs b/src/libstd/rt/env.rs index d1bd450afe2..f3fa482b18c 100644 --- a/src/libstd/rt/env.rs +++ b/src/libstd/rt/env.rs @@ -17,7 +17,7 @@ use os; // Note that these are all accessed without any synchronization. // They are expected to be initialized once then left alone. -static mut MIN_STACK: uint = 2000000; +static mut MIN_STACK: uint = 2 * 1024 * 1024; static mut DEBUG_BORROW: bool = false; static mut POISON_ON_FREE: bool = false; diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs deleted file mode 100644 index f4f128cf5aa..00000000000 --- a/src/libstd/rt/kill.rs +++ /dev/null @@ -1,318 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! - -Task death: asynchronous killing, linked failure, exit code propagation. - -This file implements two orthogonal building-blocks for communicating failure -between tasks. One is 'linked failure' or 'task killing', that is, a failing -task causing other tasks to fail promptly (even those that are blocked on -pipes or I/O). The other is 'exit code propagation', which affects the result -observed by the parent of a task::try task that itself spawns child tasks -(such as any #[test] function). In both cases the data structures live in -KillHandle. - - -I. Task killing. - -The model for killing involves two atomic flags, the "kill flag" and the -"unkillable flag". Operations on the kill flag include: - -- In the taskgroup code (task/spawn.rs), tasks store a clone of their - KillHandle in their shared taskgroup. Another task in the group that fails - will use that handle to call kill(). -- When a task blocks, it turns its ~Task into a BlockedTask by storing a - the transmuted ~Task pointer inside the KillHandle's kill flag. A task - trying to block and a task trying to kill it can simultaneously access the - kill flag, after which the task will get scheduled and fail (no matter who - wins the race). Likewise, a task trying to wake a blocked task normally and - a task trying to kill it can simultaneously access the flag; only one will - get the task to reschedule it. - -Operations on the unkillable flag include: - -- When a task becomes unkillable, it swaps on the flag to forbid any killer - from waking it up while it's blocked inside the unkillable section. If a - kill was already pending, the task fails instead of becoming unkillable. -- When a task is done being unkillable, it restores the flag to the normal - running state. If a kill was received-but-blocked during the unkillable - section, the task fails at this later point. -- When a task tries to kill another task, before swapping on the kill flag, it - first swaps on the unkillable flag, to see if it's "allowed" to wake up the - task. If it isn't, the killed task will receive the signal when it becomes - killable again. (Of course, a task trying to wake the task normally (e.g. - sending on a channel) does not access the unkillable flag at all.) - -Why do we not need acquire/release barriers on any of the kill flag swaps? -This is because barriers establish orderings between accesses on different -memory locations, but each kill-related operation is only a swap on a single -location, so atomicity is all that matters. The exception is kill(), which -does a swap on both flags in sequence. kill() needs no barriers because it -does not matter if its two accesses are seen reordered on another CPU: if a -killer does perform both writes, it means it saw a KILL_RUNNING in the -unkillable flag, which means an unkillable task will see KILL_KILLED and fail -immediately (rendering the subsequent write to the kill flag unnecessary). - - -II. Exit code propagation. - -The basic model for exit code propagation, which is used with the "watched" -spawn mode (on by default for linked spawns, off for supervised and unlinked -spawns), is that a parent will wait for all its watched children to exit -before reporting whether it succeeded or failed. A watching parent will only -report success if it succeeded and all its children also reported success; -otherwise, it will report failure. This is most useful for writing test cases: - - ``` -#[test] -fn test_something_in_another_task { - do spawn { - assert!(collatz_conjecture_is_false()); - } -} - ``` - -Here, as the child task will certainly outlive the parent task, we might miss -the failure of the child when deciding whether or not the test case passed. -The watched spawn mode avoids this problem. - -In order to propagate exit codes from children to their parents, any -'watching' parent must wait for all of its children to exit before it can -report its final exit status. We achieve this by using an UnsafeArc, using the -reference counting to track how many children are still alive, and using the -unwrap() operation in the parent's exit path to wait for all children to exit. -The UnsafeArc referred to here is actually the KillHandle itself. - -This also works transitively, as if a "middle" watched child task is itself -watching a grandchild task, the "middle" task will do unwrap() on its own -KillHandle (thereby waiting for the grandchild to exit) before dropping its -reference to its watching parent (which will alert the parent). - -While UnsafeArc::unwrap() accomplishes the synchronization, there remains the -matter of reporting the exit codes themselves. This is easiest when an exiting -watched task has no watched children of its own: - -- If the task with no watched children exits successfully, it need do nothing. -- If the task with no watched children has failed, it sets a flag in the - parent's KillHandle ("any_child_failed") to false. It then stays false forever. - -However, if a "middle" watched task with watched children of its own exits -before its child exits, we need to ensure that the grandparent task may still -see a failure from the grandchild task. While we could achieve this by having -each intermediate task block on its handle, this keeps around the other resources -the task was using. To be more efficient, this is accomplished via "tombstones". - -A tombstone is a closure, proc() -> bool, which will perform any waiting necessary -to collect the exit code of descendant tasks. In its environment is captured -the KillHandle of whichever task created the tombstone, and perhaps also any -tombstones that that task itself had, and finally also another tombstone, -effectively creating a lazy-list of heap closures. - -When a child wishes to exit early and leave tombstones behind for its parent, -it must use a LittleLock (pthread mutex) to synchronize with any possible -sibling tasks which are trying to do the same thing with the same parent. -However, on the other side, when the parent is ready to pull on the tombstones, -it need not use this lock, because the unwrap() serves as a barrier that ensures -no children will remain with references to the handle. - -The main logic for creating and assigning tombstones can be found in the -function reparent_children_to() in the impl for KillHandle. - - -IIA. Issues with exit code propagation. - -There are two known issues with the current scheme for exit code propagation. - -- As documented in issue #8136, the structure mandates the possibility for stack - overflow when collecting tombstones that are very deeply nested. This cannot - be avoided with the closure representation, as tombstones end up structured in - a sort of tree. However, notably, the tombstones do not actually need to be - collected in any particular order, and so a doubly-linked list may be used. - However we do not do this yet because DList is in libextra. - -- A discussion with Graydon made me realize that if we decoupled the exit code - propagation from the parents-waiting action, this could result in a simpler - implementation as the exit codes themselves would not have to be propagated, - and could instead be propagated implicitly through the taskgroup mechanism - that we already have. The tombstoning scheme would still be required. I have - not implemented this because currently we can't receive a linked failure kill - signal during the task cleanup activity, as that is currently "unkillable", - and occurs outside the task's unwinder's "try" block, so would require some - restructuring. - -*/ - -use cast; -use option::{Option, Some, None}; -use prelude::*; -use iter; -use task::TaskResult; -use rt::task::Task; -use unstable::atomics::{AtomicUint, SeqCst}; -use unstable::sync::UnsafeArc; - -/// A handle to a blocked task. Usually this means having the ~Task pointer by -/// ownership, but if the task is killable, a killer can steal it at any time. -pub enum BlockedTask { - Owned(~Task), - Shared(UnsafeArc<AtomicUint>), -} - -/// Per-task state related to task death, killing, failure, etc. -pub struct Death { - // Action to be done with the exit code. If set, also makes the task wait - // until all its watched children exit before collecting the status. - on_exit: Option<proc(TaskResult)>, - // nesting level counter for unstable::atomically calls (0 == can deschedule). - priv wont_sleep: int, -} - -pub struct BlockedTaskIterator { - priv inner: UnsafeArc<AtomicUint>, -} - -impl Iterator<BlockedTask> for BlockedTaskIterator { - fn next(&mut self) -> Option<BlockedTask> { - Some(Shared(self.inner.clone())) - } -} - -impl BlockedTask { - /// Returns Some if the task was successfully woken; None if already killed. - pub fn wake(self) -> Option<~Task> { - match self { - Owned(task) => Some(task), - Shared(arc) => unsafe { - match (*arc.get()).swap(0, SeqCst) { - 0 => None, - n => cast::transmute(n), - } - } - } - } - - /// Create a blocked task, unless the task was already killed. - pub fn block(task: ~Task) -> BlockedTask { - Owned(task) - } - - /// Converts one blocked task handle to a list of many handles to the same. - pub fn make_selectable(self, num_handles: uint) - -> iter::Take<BlockedTaskIterator> - { - let arc = match self { - Owned(task) => { - let flag = unsafe { AtomicUint::new(cast::transmute(task)) }; - UnsafeArc::new(flag) - } - Shared(arc) => arc.clone(), - }; - BlockedTaskIterator{ inner: arc }.take(num_handles) - } - - // This assertion has two flavours because the wake involves an atomic op. - // In the faster version, destructors will fail dramatically instead. - #[inline] #[cfg(not(test))] - pub fn assert_already_awake(self) { } - #[inline] #[cfg(test)] - pub fn assert_already_awake(self) { assert!(self.wake().is_none()); } - - /// Convert to an unsafe uint value. Useful for storing in a pipe's state flag. - #[inline] - pub unsafe fn cast_to_uint(self) -> uint { - match self { - Owned(task) => { - let blocked_task_ptr: uint = cast::transmute(task); - rtassert!(blocked_task_ptr & 0x1 == 0); - blocked_task_ptr - } - Shared(arc) => { - let blocked_task_ptr: uint = cast::transmute(~arc); - rtassert!(blocked_task_ptr & 0x1 == 0); - blocked_task_ptr | 0x1 - } - } - } - - /// Convert from an unsafe uint value. Useful for retrieving a pipe's state flag. - #[inline] - pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask { - if blocked_task_ptr & 0x1 == 0 { - Owned(cast::transmute(blocked_task_ptr)) - } else { - let ptr: ~UnsafeArc<AtomicUint> = cast::transmute(blocked_task_ptr & !1); - Shared(*ptr) - } - } -} - -impl Death { - pub fn new() -> Death { - Death { - on_exit: None, - wont_sleep: 0, - } - } - - /// Collect failure exit codes from children and propagate them to a parent. - pub fn collect_failure(&mut self, result: TaskResult) { - match self.on_exit.take() { - Some(f) => f(result), - None => {} - } - } - - /// Enter a possibly-nested "atomic" section of code. Just for assertions. - /// All calls must be paired with a subsequent call to allow_deschedule. - #[inline] - pub fn inhibit_deschedule(&mut self) { - self.wont_sleep += 1; - } - - /// Exit a possibly-nested "atomic" section of code. Just for assertions. - /// All calls must be paired with a preceding call to inhibit_deschedule. - #[inline] - pub fn allow_deschedule(&mut self) { - rtassert!(self.wont_sleep != 0); - self.wont_sleep -= 1; - } - - /// Ensure that the task is allowed to become descheduled. - #[inline] - pub fn assert_may_sleep(&self) { - if self.wont_sleep != 0 { - rtabort!("illegal atomic-sleep: attempt to reschedule while \ - using an Exclusive or LittleLock"); - } - } -} - -impl Drop for Death { - fn drop(&mut self) { - // Mustn't be in an atomic or unkillable section at task death. - rtassert!(self.wont_sleep == 0); - } -} - -#[cfg(test)] -mod test { - use rt::test::*; - use super::*; - - // Task blocking tests - - #[test] - fn block_and_wake() { - do with_test_task |task| { - BlockedTask::block(task).wake().unwrap() - } - } -} diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index d73ad98a25b..1c04b6b43ce 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -8,8 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::{Option, Some, None}; -use rt::sched::Scheduler; +use option::Option; use rt::task::Task; use rt::local_ptr; @@ -46,87 +45,10 @@ impl Local<local_ptr::Borrowed<Task>> for Task { } } -/// Encapsulates a temporarily-borrowed scheduler. -pub struct BorrowedScheduler { - priv task: local_ptr::Borrowed<Task>, -} - -impl BorrowedScheduler { - fn new(mut task: local_ptr::Borrowed<Task>) -> BorrowedScheduler { - if task.get().sched.is_none() { - rtabort!("no scheduler") - } else { - BorrowedScheduler { - task: task, - } - } - } - - #[inline] - pub fn get<'a>(&'a mut self) -> &'a mut ~Scheduler { - match self.task.get().sched { - None => rtabort!("no scheduler"), - Some(ref mut sched) => sched, - } - } -} - -impl Local<BorrowedScheduler> for Scheduler { - fn put(value: ~Scheduler) { - let mut task = Local::borrow(None::<Task>); - task.get().sched = Some(value); - } - #[inline] - fn take() -> ~Scheduler { - unsafe { - // XXX: Unsafe for speed - let task: *mut Task = Local::unsafe_borrow(); - (*task).sched.take_unwrap() - } - } - fn exists(_: Option<Scheduler>) -> bool { - let mut task = Local::borrow(None::<Task>); - task.get().sched.is_some() - } - #[inline] - fn borrow(_: Option<Scheduler>) -> BorrowedScheduler { - BorrowedScheduler::new(Local::borrow(None::<Task>)) - } - unsafe fn unsafe_take() -> ~Scheduler { rtabort!("unimpl") } - unsafe fn unsafe_borrow() -> *mut Scheduler { - let task: *mut Task = Local::unsafe_borrow(); - match (*task).sched { - Some(~ref mut sched) => { - let s: *mut Scheduler = &mut *sched; - return s; - } - None => { - rtabort!("no scheduler") - } - } - } - unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { - let task_opt: Option<*mut Task> = Local::try_unsafe_borrow(); - match task_opt { - Some(task) => { - match (*task).sched { - Some(~ref mut sched) => { - let s: *mut Scheduler = &mut *sched; - Some(s) - } - None => None - } - } - None => None - } - } -} - #[cfg(test)] mod test { use option::None; use unstable::run_in_bare_thread; - use rt::test::*; use super::*; use rt::task::Task; use rt::local_ptr; @@ -135,8 +57,7 @@ mod test { fn thread_local_task_smoke_test() { do run_in_bare_thread { local_ptr::init(); - let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, None, proc(){}); + let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); @@ -147,12 +68,11 @@ mod test { fn thread_local_task_two_instances() { do run_in_bare_thread { local_ptr::init(); - let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, None, proc(){}); + let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); - let task = ~Task::new_root(&mut sched.stack_pool, None, proc(){}); + let task = ~Task::new(); Local::put(task); let task: ~Task = Local::take(); cleanup_task(task); @@ -164,8 +84,7 @@ mod test { fn borrow_smoke_test() { do run_in_bare_thread { local_ptr::init(); - let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, None, proc(){}); + let task = ~Task::new(); Local::put(task); unsafe { @@ -180,8 +99,7 @@ mod test { fn borrow_with_return() { do run_in_bare_thread { local_ptr::init(); - let mut sched = ~new_test_uv_sched(); - let task = ~Task::new_root(&mut sched.stack_pool, None, proc(){}); + let task = ~Task::new(); Local::put(task); { @@ -193,5 +111,9 @@ mod test { } } + fn cleanup_task(mut t: ~Task) { + t.destroyed = true; + } + } diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index 925aa802ad5..42cce272e44 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -42,7 +42,7 @@ impl<T> Drop for Borrowed<T> { } let val: ~T = cast::transmute(self.val); put::<T>(val); - assert!(exists()); + rtassert!(exists()); } } } @@ -109,7 +109,9 @@ pub mod compiled { /// Does not validate the pointer type. #[inline] pub unsafe fn take<T>() -> ~T { - let ptr: ~T = cast::transmute(RT_TLS_PTR); + let ptr = RT_TLS_PTR; + rtassert!(!ptr.is_null()); + let ptr: ~T = cast::transmute(ptr); // can't use `as`, due to type not matching with `cfg(test)` RT_TLS_PTR = cast::transmute(0); ptr @@ -178,7 +180,7 @@ pub mod native { } pub unsafe fn cleanup() { - assert!(INITIALIZED); + rtassert!(INITIALIZED); tls::destroy(RT_TLS_KEY); LOCK.destroy(); INITIALIZED = false; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index df1ebeb6407..0dd6c883d5b 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -57,27 +57,17 @@ Several modules in `core` are clients of `rt`: // XXX: this should not be here. #[allow(missing_doc)]; +use any::Any; use clone::Clone; use container::Container; use iter::Iterator; -use option::{Option, None, Some}; +use option::Option; use ptr::RawPtr; -use rt::local::Local; -use rt::sched::{Scheduler, Shutdown}; -use rt::sleeper_list::SleeperList; -use task::TaskResult; -use rt::task::{Task, SchedTask, GreenTask, Sched}; -use send_str::SendStrStatic; -use unstable::atomics::{AtomicInt, AtomicBool, SeqCst}; -use unstable::sync::UnsafeArc; +use result::Result; +use task::TaskOpts; use vec::{OwnedVector, MutableVector, ImmutableVector}; -use vec; -use self::thread::Thread; - -// the os module needs to reach into this helper, so allow general access -// through this reexport. -pub use self::util::set_exit_status; +use self::task::{Task, BlockedTask}; // this is somewhat useful when a program wants to spawn a "reasonable" number // of workers based on the constraints of the system that it's running on. @@ -85,8 +75,8 @@ pub use self::util::set_exit_status; // method... pub use self::util::default_sched_threads; -// Re-export of the functionality in the kill module -pub use self::kill::BlockedTask; +// Export unwinding facilities used by the failure macros +pub use self::unwind::{begin_unwind, begin_unwind_raw}; // XXX: these probably shouldn't be public... #[doc(hidden)] @@ -99,21 +89,12 @@ pub mod shouldnt_be_public { // Internal macros used by the runtime. mod macros; -/// Basic implementation of an EventLoop, provides no I/O interfaces -mod basic; - /// The global (exchange) heap. pub mod global_heap; /// Implementations of language-critical runtime features like @. pub mod task; -/// Facilities related to task failure, killing, and death. -mod kill; - -/// The coroutine task scheduler, built on the `io` event loop. -pub mod sched; - /// The EventLoop and internal synchronous I/O interface. pub mod rtio; @@ -121,27 +102,6 @@ pub mod rtio; /// or task-local storage. pub mod local; -/// A mostly lock-free multi-producer, single consumer queue. -pub mod mpsc_queue; - -/// A lock-free single-producer, single consumer queue. -pub mod spsc_queue; - -/// A lock-free multi-producer, multi-consumer bounded queue. -mod mpmc_bounded_queue; - -/// A parallel work-stealing deque -pub mod deque; - -/// A parallel data structure for tracking sleeping schedulers. -pub mod sleeper_list; - -/// Stack segments and caching. -pub mod stack; - -/// CPU context swapping. -mod context; - /// Bindings to system threading libraries. pub mod thread; @@ -157,16 +117,6 @@ pub mod logging; /// Crate map pub mod crate_map; -/// Tools for testing the runtime -pub mod test; - -/// Reference counting -pub mod rc; - -/// A simple single-threaded channel type for passing buffered data between -/// scheduler and task context -pub mod tube; - /// The runtime needs to be able to put a pointer into thread-local storage. mod local_ptr; @@ -185,41 +135,33 @@ pub mod args; // Support for dynamic borrowck pub mod borrowck; -/// Set up a default runtime configuration, given compiler-supplied arguments. -/// -/// This is invoked by the `start` _language item_ (unstable::lang) to -/// run a Rust executable. -/// -/// # Arguments -/// -/// * `argc` & `argv` - The argument vector. On Unix this information is used -/// by os::args. -/// -/// # Return value -/// -/// The return value is used as the process return code. 0 on success, 101 on error. -pub fn start(argc: int, argv: **u8, main: proc()) -> int { - - init(argc, argv); - let exit_code = run(main); - // unsafe is ok b/c we're sure that the runtime is gone - unsafe { cleanup(); } - - return exit_code; -} +/// The default error code of the rust runtime if the main task fails instead +/// of exiting cleanly. +pub static DEFAULT_ERROR_CODE: int = 101; -/// Like `start` but creates an additional scheduler on the current thread, -/// which in most cases will be the 'main' thread, and pins the main task to it. +/// The interface to the current runtime. /// -/// This is appropriate for running code that must execute on the main thread, -/// such as the platform event loop and GUI. -pub fn start_on_main_thread(argc: int, argv: **u8, main: proc()) -> int { - init(argc, argv); - let exit_code = run_on_main_thread(main); - // unsafe is ok b/c we're sure that the runtime is gone - unsafe { cleanup(); } - - return exit_code; +/// This trait is used as the abstraction between 1:1 and M:N scheduling. The +/// two independent crates, libnative and libgreen, both have objects which +/// implement this trait. The goal of this trait is to encompass all the +/// fundamental differences in functionality between the 1:1 and M:N runtime +/// modes. +pub trait Runtime { + // Necessary scheduling functions, used for channels and blocking I/O + // (sometimes). + fn yield_now(~self, cur_task: ~Task); + fn maybe_yield(~self, cur_task: ~Task); + fn deschedule(~self, times: uint, cur_task: ~Task, + f: |BlockedTask| -> Result<(), BlockedTask>); + fn reawaken(~self, to_wake: ~Task, can_resched: bool); + + // Miscellaneous calls which are very different depending on what context + // you're in. + fn spawn_sibling(~self, cur_task: ~Task, opts: TaskOpts, f: proc()); + fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>>; + + // XXX: This is a serious code smell and this should not exist at all. + fn wrap(~self) -> ~Any; } /// One-time runtime initialization. @@ -234,6 +176,7 @@ pub fn init(argc: int, argv: **u8) { args::init(argc, argv); env::init(); logging::init(); + local_ptr::init(); } } @@ -250,239 +193,3 @@ pub unsafe fn cleanup() { args::cleanup(); local_ptr::cleanup(); } - -/// Execute the main function in a scheduler. -/// -/// Configures the runtime according to the environment, by default -/// using a task scheduler with the same number of threads as cores. -/// Returns a process exit code. -pub fn run(main: proc()) -> int { - run_(main, false) -} - -pub fn run_on_main_thread(main: proc()) -> int { - run_(main, true) -} - -fn run_(main: proc(), use_main_sched: bool) -> int { - static DEFAULT_ERROR_CODE: int = 101; - - let nscheds = util::default_sched_threads(); - - let mut main = Some(main); - - // The shared list of sleeping schedulers. - let sleepers = SleeperList::new(); - - // Create a work queue for each scheduler, ntimes. Create an extra - // for the main thread if that flag is set. We won't steal from it. - let mut pool = deque::BufferPool::new(); - let arr = vec::from_fn(nscheds, |_| pool.deque()); - let (workers, stealers) = vec::unzip(arr.move_iter()); - - // The schedulers. - let mut scheds = ~[]; - // Handles to the schedulers. When the main task ends these will be - // sent the Shutdown message to terminate the schedulers. - let mut handles = ~[]; - - for worker in workers.move_iter() { - rtdebug!("inserting a regular scheduler"); - - // Every scheduler is driven by an I/O event loop. - let loop_ = new_event_loop(); - let mut sched = ~Scheduler::new(loop_, - worker, - stealers.clone(), - sleepers.clone()); - let handle = sched.make_handle(); - - scheds.push(sched); - handles.push(handle); - } - - // If we need a main-thread task then create a main thread scheduler - // that will reject any task that isn't pinned to it - let main_sched = if use_main_sched { - - // Create a friend handle. - let mut friend_sched = scheds.pop(); - let friend_handle = friend_sched.make_handle(); - scheds.push(friend_sched); - - // This scheduler needs a queue that isn't part of the stealee - // set. - let (worker, _) = pool.deque(); - - let main_loop = new_event_loop(); - let mut main_sched = ~Scheduler::new_special(main_loop, - worker, - stealers.clone(), - sleepers.clone(), - false, - Some(friend_handle)); - let mut main_handle = main_sched.make_handle(); - // Allow the scheduler to exit when the main task exits. - // Note: sending the shutdown message also prevents the scheduler - // from pushing itself to the sleeper list, which is used for - // waking up schedulers for work stealing; since this is a - // non-work-stealing scheduler it should not be adding itself - // to the list. - main_handle.send(Shutdown); - Some(main_sched) - } else { - None - }; - - // Create a shared cell for transmitting the process exit - // code from the main task to this function. - let exit_code = UnsafeArc::new(AtomicInt::new(0)); - let exit_code_clone = exit_code.clone(); - - // Used to sanity check that the runtime only exits once - let exited_already = UnsafeArc::new(AtomicBool::new(false)); - - // When the main task exits, after all the tasks in the main - // task tree, shut down the schedulers and set the exit code. - let handles = handles; - let on_exit: proc(TaskResult) = proc(exit_success) { - unsafe { - assert!(!(*exited_already.get()).swap(true, SeqCst), - "the runtime already exited"); - } - - let mut handles = handles; - for handle in handles.mut_iter() { - handle.send(Shutdown); - } - - unsafe { - let exit_code = if exit_success.is_ok() { - use rt::util; - - // If we're exiting successfully, then return the global - // exit status, which can be set programmatically. - util::get_exit_status() - } else { - DEFAULT_ERROR_CODE - }; - (*exit_code_clone.get()).store(exit_code, SeqCst); - } - }; - - let mut threads = ~[]; - let mut on_exit = Some(on_exit); - - if !use_main_sched { - - // In the case where we do not use a main_thread scheduler we - // run the main task in one of our threads. - - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - None, - ::util::replace(&mut main, - None).unwrap()); - main_task.name = Some(SendStrStatic("<main>")); - main_task.death.on_exit = ::util::replace(&mut on_exit, None); - - let sched = scheds.pop(); - let main_task = main_task; - let thread = do Thread::start { - sched.bootstrap(main_task); - }; - threads.push(thread); - } - - // Run each remaining scheduler in a thread. - for sched in scheds.move_rev_iter() { - rtdebug!("creating regular schedulers"); - let thread = do Thread::start { - let mut sched = sched; - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || { - rtdebug!("boostraping a non-primary scheduler"); - }; - sched.bootstrap(bootstrap_task); - }; - threads.push(thread); - } - - // If we do have a main thread scheduler, run it now. - - if use_main_sched { - rtdebug!("about to create the main scheduler task"); - - let mut main_sched = main_sched.unwrap(); - - let home = Sched(main_sched.make_handle()); - let mut main_task = ~Task::new_root_homed(&mut main_sched.stack_pool, - None, - home, - ::util::replace(&mut main, - None). - unwrap()); - main_task.name = Some(SendStrStatic("<main>")); - main_task.death.on_exit = ::util::replace(&mut on_exit, None); - rtdebug!("bootstrapping main_task"); - - main_sched.bootstrap(main_task); - } - - rtdebug!("waiting for threads"); - - // Wait for schedulers - for thread in threads.move_iter() { - thread.join(); - } - - // Return the exit code - unsafe { - (*exit_code.get()).load(SeqCst) - } -} - -pub fn in_sched_context() -> bool { - unsafe { - let task_ptr: Option<*mut Task> = Local::try_unsafe_borrow(); - match task_ptr { - Some(task) => { - match (*task).task_type { - SchedTask => true, - _ => false - } - } - None => false - } - } -} - -pub fn in_green_task_context() -> bool { - unsafe { - let task: Option<*mut Task> = Local::try_unsafe_borrow(); - match task { - Some(task) => { - match (*task).task_type { - GreenTask(_) => true, - _ => false - } - } - None => false - } - } -} - -pub fn new_event_loop() -> ~rtio::EventLoop { - match crate_map::get_crate_map() { - None => {} - Some(map) => { - match map.event_loop_factory { - None => {} - Some(factory) => return factory() - } - } - } - - // If the crate map didn't specify a factory to create an event loop, then - // instead just use a basic event loop missing all I/O services to at least - // get the scheduler running. - return basic::event_loop(); -} diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs deleted file mode 100644 index 25a3ba8ab48..00000000000 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ /dev/null @@ -1,209 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -use unstable::sync::UnsafeArc; -use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire}; -use option::*; -use vec; -use clone::Clone; -use kinds::Send; -use num::{Exponential,Algebraic,Round}; - -struct Node<T> { - sequence: AtomicUint, - value: Option<T>, -} - -struct State<T> { - pad0: [u8, ..64], - buffer: ~[Node<T>], - mask: uint, - pad1: [u8, ..64], - enqueue_pos: AtomicUint, - pad2: [u8, ..64], - dequeue_pos: AtomicUint, - pad3: [u8, ..64], -} - -pub struct Queue<T> { - priv state: UnsafeArc<State<T>>, -} - -impl<T: Send> State<T> { - fn with_capacity(capacity: uint) -> State<T> { - let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { - if capacity < 2 { - 2u - } else { - // use next power of 2 as capacity - 2f64.pow(&((capacity as f64).log2().ceil())) as uint - } - } else { - capacity - }; - let buffer = vec::from_fn(capacity, |i:uint| { - Node{sequence:AtomicUint::new(i),value:None} - }); - State{ - pad0: [0, ..64], - buffer: buffer, - mask: capacity-1, - pad1: [0, ..64], - enqueue_pos: AtomicUint::new(0), - pad2: [0, ..64], - dequeue_pos: AtomicUint::new(0), - pad3: [0, ..64], - } - } - - fn push(&mut self, value: T) -> bool { - let mask = self.mask; - let mut pos = self.enqueue_pos.load(Relaxed); - loop { - let node = &mut self.buffer[pos & mask]; - let seq = node.sequence.load(Acquire); - let diff: int = seq as int - pos as int; - - if diff == 0 { - let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); - if enqueue_pos == pos { - node.value = Some(value); - node.sequence.store(pos+1, Release); - break - } else { - pos = enqueue_pos; - } - } else if (diff < 0) { - return false - } else { - pos = self.enqueue_pos.load(Relaxed); - } - } - true - } - - fn pop(&mut self) -> Option<T> { - let mask = self.mask; - let mut pos = self.dequeue_pos.load(Relaxed); - loop { - let node = &mut self.buffer[pos & mask]; - let seq = node.sequence.load(Acquire); - let diff: int = seq as int - (pos + 1) as int; - if diff == 0 { - let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); - if dequeue_pos == pos { - let value = node.value.take(); - node.sequence.store(pos + mask + 1, Release); - return value - } else { - pos = dequeue_pos; - } - } else if diff < 0 { - return None - } else { - pos = self.dequeue_pos.load(Relaxed); - } - } - } -} - -impl<T: Send> Queue<T> { - pub fn with_capacity(capacity: uint) -> Queue<T> { - Queue{ - state: UnsafeArc::new(State::with_capacity(capacity)) - } - } - - pub fn push(&mut self, value: T) -> bool { - unsafe { (*self.state.get()).push(value) } - } - - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } -} - -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use option::*; - use task; - use super::Queue; - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let mut q = Queue::with_capacity(nthreads*nmsgs); - assert_eq!(None, q.pop()); - - for _ in range(0, nthreads) { - let q = q.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - for i in range(0, nmsgs) { - assert!(q.push(i)); - } - } - } - - let mut completion_ports = ~[]; - for _ in range(0, nthreads) { - let (completion_port, completion_chan) = Chan::new(); - completion_ports.push(completion_port); - let q = q.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nmsgs { break } - } - } - } - completion_chan.send(i); - } - } - - for completion_port in completion_ports.mut_iter() { - assert_eq!(nmsgs, completion_port.recv()); - } - } -} diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs deleted file mode 100644 index d575028af70..00000000000 --- a/src/libstd/rt/mpsc_queue.rs +++ /dev/null @@ -1,215 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -use cast; -use clone::Clone; -use kinds::Send; -use ops::Drop; -use option::{Option, None, Some}; -use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; -use unstable::sync::UnsafeArc; - -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -struct State<T, P> { - head: AtomicPtr<Node<T>>, - tail: *mut Node<T>, - packet: P, -} - -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -impl<T: Send, P: Send> Clone for Producer<T, P> { - fn clone(&self) -> Producer<T, P> { - Producer { state: self.state.clone() } - } -} - -pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } -} - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - cast::transmute(~Node { - next: AtomicPtr::new(0 as *mut Node<T>), - value: v, - }) - } -} - -impl<T: Send, P: Send> State<T, P> { - pub unsafe fn new(p: P) -> State<T, P> { - let stub = Node::new(None); - State { - head: AtomicPtr::new(stub), - tail: stub, - packet: p, - } - } - - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult<T> { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node<T> = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } - - unsafe fn is_empty(&mut self) -> bool { - return (*self.tail).next.load(Acquire).is_null(); - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { - unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node<T> = cast::transmute(cur); - cur = next; - } - } - } -} - -impl<T: Send, P: Send> Producer<T, P> { - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - pub fn is_empty(&self) -> bool { - unsafe{ (*self.state.get()).is_empty() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - pub fn pop(&mut self) -> PopResult<T> { - unsafe { (*self.state.get()).pop() } - } - pub fn casual_pop(&mut self) -> Option<T> { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - - use task; - use super::{queue, Data, Empty, Inconsistent}; - - #[test] - fn test_full() { - let (_, mut p) = queue(()); - p.push(~1); - p.push(~2); - } - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let (mut c, p) = queue(()); - match c.pop() { - Empty => {} - Inconsistent | Data(..) => fail!() - } - - for _ in range(0, nthreads) { - let q = p.clone(); - do task::spawn_sched(task::SingleThreaded) { - let mut q = q; - for i in range(0, nmsgs) { - q.push(i); - } - } - } - - let mut i = 0u; - while i < nthreads * nmsgs { - match c.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - } -} - diff --git a/src/libstd/rt/rc.rs b/src/libstd/rt/rc.rs deleted file mode 100644 index 2699dab6d38..00000000000 --- a/src/libstd/rt/rc.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! An owned, task-local, reference counted type -//! -//! # Safety note -//! -//! XXX There is currently no type-system mechanism for enforcing that -//! reference counted types are both allocated on the exchange heap -//! and also non-sendable -//! -//! This doesn't prevent borrowing multiple aliasable mutable pointers - -use ops::Drop; -use clone::Clone; -use libc::c_void; -use cast; - -pub struct RC<T> { - priv p: *c_void // ~(uint, T) -} - -impl<T> RC<T> { - pub fn new(val: T) -> RC<T> { - unsafe { - let v = ~(1, val); - let p: *c_void = cast::transmute(v); - RC { p: p } - } - } - - fn get_mut_state(&mut self) -> *mut (uint, T) { - unsafe { - let p: &mut ~(uint, T) = cast::transmute(&mut self.p); - let p: *mut (uint, T) = &mut **p; - return p; - } - } - - fn get_state(&self) -> *(uint, T) { - unsafe { - let p: &~(uint, T) = cast::transmute(&self.p); - let p: *(uint, T) = &**p; - return p; - } - } - - pub fn unsafe_borrow_mut(&mut self) -> *mut T { - unsafe { - match *self.get_mut_state() { - (_, ref mut p) => { - let p: *mut T = p; - return p; - } - } - } - } - - pub fn refcount(&self) -> uint { - unsafe { - match *self.get_state() { - (count, _) => count - } - } - } -} - -#[unsafe_destructor] -impl<T> Drop for RC<T> { - fn drop(&mut self) { - assert!(self.refcount() > 0); - - unsafe { - match *self.get_mut_state() { - (ref mut count, _) => { - *count = *count - 1 - } - } - - if self.refcount() == 0 { - let _: ~(uint, T) = cast::transmute(self.p); - } - } - } -} - -impl<T> Clone for RC<T> { - fn clone(&self) -> RC<T> { - unsafe { - // XXX: Mutable clone - let this: &mut RC<T> = cast::transmute_mut(self); - - match *this.get_mut_state() { - (ref mut count, _) => { - *count = *count + 1; - } - } - } - - RC { p: self.p } - } -} - -#[cfg(test)] -mod test { - use super::RC; - - #[test] - fn smoke_test() { - unsafe { - let mut v1 = RC::new(100); - assert!(*v1.unsafe_borrow_mut() == 100); - assert!(v1.refcount() == 1); - - let mut v2 = v1.clone(); - assert!(*v2.unsafe_borrow_mut() == 100); - assert!(v2.refcount() == 2); - - *v2.unsafe_borrow_mut() = 200; - assert!(*v2.unsafe_borrow_mut() == 200); - assert!(*v1.unsafe_borrow_mut() == 200); - - let v3 = v2.clone(); - assert!(v3.refcount() == 3); - { - let _v1 = v1; - let _v2 = v2; - } - assert!(v3.refcount() == 1); - } - } -} diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index b54231421e3..6b3d50a76ac 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -14,14 +14,15 @@ use comm::{SharedChan, Port}; use libc::c_int; use libc; use ops::Drop; -use option::*; +use option::{Option, Some, None}; use path::Path; -use result::*; +use result::{Result, Ok, Err}; +use rt::task::Task; +use rt::local::Local; use ai = io::net::addrinfo; +use io; use io::IoError; -use io::native::NATIVE_IO_FACTORY; -use io::native; use io::net::ip::{IpAddr, SocketAddr}; use io::process::{ProcessConfig, ProcessExit}; use io::signal::Signum; @@ -93,36 +94,52 @@ impl<'a> Drop for LocalIo<'a> { impl<'a> LocalIo<'a> { /// Returns the local I/O: either the local scheduler's I/O services or /// the native I/O services. - pub fn borrow() -> LocalIo { - use rt::sched::Scheduler; - use rt::local::Local; + pub fn borrow() -> Option<LocalIo> { + // FIXME(#11053): bad + // + // This is currently very unsafely implemented. We don't actually + // *take* the local I/O so there's a very real possibility that we + // can have two borrows at once. Currently there is not a clear way + // to actually borrow the local I/O factory safely because even if + // ownership were transferred down to the functions that the I/O + // factory implements it's just too much of a pain to know when to + // relinquish ownership back into the local task (but that would be + // the safe way of implementing this function). + // + // In order to get around this, we just transmute a copy out of the task + // in order to have what is likely a static lifetime (bad). + let mut t: ~Task = Local::take(); + let ret = t.local_io().map(|t| { + unsafe { cast::transmute_copy(&t) } + }); + Local::put(t); + return ret; + } - unsafe { - // First, attempt to use the local scheduler's I/O services - let sched: Option<*mut Scheduler> = Local::try_unsafe_borrow(); - match sched { - Some(sched) => { - match (*sched).event_loop.io() { - Some(factory) => { - return LocalIo { - factory: factory, - } - } - None => {} + pub fn maybe_raise<T>(f: |io: &mut IoFactory| -> Result<T, IoError>) + -> Option<T> + { + match LocalIo::borrow() { + None => { + io::io_error::cond.raise(io::standard_error(io::IoUnavailable)); + None + } + Some(mut io) => { + match f(io.get()) { + Ok(t) => Some(t), + Err(ioerr) => { + io::io_error::cond.raise(ioerr); + None } } - None => {} - } - // If we don't have a scheduler or the scheduler doesn't have I/O - // services, then fall back to the native I/O services. - let native_io: &'static mut native::IoFactory = - &mut NATIVE_IO_FACTORY; - LocalIo { - factory: native_io as &mut IoFactory:'static } } } + pub fn new<'a>(io: &'a mut IoFactory) -> LocalIo<'a> { + LocalIo { factory: io } + } + /// Returns the underlying I/O factory as a trait reference. #[inline] pub fn get<'a>(&'a mut self) -> &'a mut IoFactory { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs deleted file mode 100644 index 15aa1602cd0..00000000000 --- a/src/libstd/rt/sched.rs +++ /dev/null @@ -1,1395 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use option::{Option, Some, None}; -use cast::{transmute, transmute_mut_region, transmute_mut_unsafe}; -use clone::Clone; -use unstable::raw; -use super::sleeper_list::SleeperList; -use super::stack::{StackPool}; -use super::rtio::EventLoop; -use super::context::Context; -use super::task::{Task, AnySched, Sched}; -use rt::kill::BlockedTask; -use rt::deque; -use rt::local_ptr; -use rt::local::Local; -use rt::rtio::{RemoteCallback, PausableIdleCallback, Callback}; -use borrow::{to_uint}; -use rand::{XorShiftRng, Rng, Rand}; -use iter::range; -use unstable::mutex::Mutex; -use vec::{OwnedVector}; - -use mpsc = super::mpsc_queue; - -/// A scheduler is responsible for coordinating the execution of Tasks -/// on a single thread. The scheduler runs inside a slightly modified -/// Rust Task. When not running this task is stored in the scheduler -/// struct. The scheduler struct acts like a baton, all scheduling -/// actions are transfers of the baton. -/// -/// XXX: This creates too many callbacks to run_sched_once, resulting -/// in too much allocation and too many events. -pub struct Scheduler { - /// There are N work queues, one per scheduler. - work_queue: deque::Worker<~Task>, - /// Work queues for the other schedulers. These are created by - /// cloning the core work queues. - work_queues: ~[deque::Stealer<~Task>], - /// The queue of incoming messages from other schedulers. - /// These are enqueued by SchedHandles after which a remote callback - /// is triggered to handle the message. - message_queue: mpsc::Consumer<SchedMessage, ()>, - /// Producer used to clone sched handles from - message_producer: mpsc::Producer<SchedMessage, ()>, - /// A shared list of sleeping schedulers. We'll use this to wake - /// up schedulers when pushing work onto the work queue. - sleeper_list: SleeperList, - /// Indicates that we have previously pushed a handle onto the - /// SleeperList but have not yet received the Wake message. - /// Being `true` does not necessarily mean that the scheduler is - /// not active since there are multiple event sources that may - /// wake the scheduler. It just prevents the scheduler from pushing - /// multiple handles onto the sleeper list. - sleepy: bool, - /// A flag to indicate we've received the shutdown message and should - /// no longer try to go to sleep, but exit instead. - no_sleep: bool, - stack_pool: StackPool, - /// The scheduler runs on a special task. When it is not running - /// it is stored here instead of the work queue. - sched_task: Option<~Task>, - /// An action performed after a context switch on behalf of the - /// code running before the context switch - cleanup_job: Option<CleanupJob>, - /// Should this scheduler run any task, or only pinned tasks? - run_anything: bool, - /// If the scheduler shouldn't run some tasks, a friend to send - /// them to. - friend_handle: Option<SchedHandle>, - /// A fast XorShift rng for scheduler use - rng: XorShiftRng, - /// A togglable idle callback - idle_callback: Option<~PausableIdleCallback>, - /// A countdown that starts at a random value and is decremented - /// every time a yield check is performed. When it hits 0 a task - /// will yield. - yield_check_count: uint, - /// A flag to tell the scheduler loop it needs to do some stealing - /// in order to introduce randomness as part of a yield - steal_for_yield: bool, - - // n.b. currently destructors of an object are run in top-to-bottom in order - // of field declaration. Due to its nature, the pausable idle callback - // must have some sort of handle to the event loop, so it needs to get - // destroyed before the event loop itself. For this reason, we destroy - // the event loop last to ensure that any unsafe references to it are - // destroyed before it's actually destroyed. - - /// The event loop used to drive the scheduler and perform I/O - event_loop: ~EventLoop, -} - -/// An indication of how hard to work on a given operation, the difference -/// mainly being whether memory is synchronized or not -#[deriving(Eq)] -enum EffortLevel { - DontTryTooHard, - GiveItYourBest -} - -static MAX_YIELD_CHECKS: uint = 20000; - -fn reset_yield_check(rng: &mut XorShiftRng) -> uint { - let r: uint = Rand::rand(rng); - r % MAX_YIELD_CHECKS + 1 -} - -impl Scheduler { - - // * Initialization Functions - - pub fn new(event_loop: ~EventLoop, - work_queue: deque::Worker<~Task>, - work_queues: ~[deque::Stealer<~Task>], - sleeper_list: SleeperList) - -> Scheduler { - - Scheduler::new_special(event_loop, work_queue, - work_queues, - sleeper_list, true, None) - - } - - pub fn new_special(event_loop: ~EventLoop, - work_queue: deque::Worker<~Task>, - work_queues: ~[deque::Stealer<~Task>], - sleeper_list: SleeperList, - run_anything: bool, - friend: Option<SchedHandle>) - -> Scheduler { - - let (consumer, producer) = mpsc::queue(()); - let mut sched = Scheduler { - sleeper_list: sleeper_list, - message_queue: consumer, - message_producer: producer, - sleepy: false, - no_sleep: false, - event_loop: event_loop, - work_queue: work_queue, - work_queues: work_queues, - stack_pool: StackPool::new(), - sched_task: None, - cleanup_job: None, - run_anything: run_anything, - friend_handle: friend, - rng: new_sched_rng(), - idle_callback: None, - yield_check_count: 0, - steal_for_yield: false - }; - - sched.yield_check_count = reset_yield_check(&mut sched.rng); - - return sched; - } - - // XXX: This may eventually need to be refactored so that - // the scheduler itself doesn't have to call event_loop.run. - // That will be important for embedding the runtime into external - // event loops. - - // Take a main task to run, and a scheduler to run it in. Create a - // scheduler task and bootstrap into it. - pub fn bootstrap(mut ~self, task: ~Task) { - - // Build an Idle callback. - let cb = ~SchedRunner as ~Callback; - self.idle_callback = Some(self.event_loop.pausable_idle_callback(cb)); - - // Initialize the TLS key. - local_ptr::init(); - - // Create a task for the scheduler with an empty context. - let sched_task = ~Task::new_sched_task(); - - // Now that we have an empty task struct for the scheduler - // task, put it in TLS. - Local::put(sched_task); - - // Before starting our first task, make sure the idle callback - // is active. As we do not start in the sleep state this is - // important. - self.idle_callback.get_mut_ref().resume(); - - // Now, as far as all the scheduler state is concerned, we are - // inside the "scheduler" context. So we can act like the - // scheduler and resume the provided task. - self.resume_task_immediately(task); - - // Now we are back in the scheduler context, having - // successfully run the input task. Start by running the - // scheduler. Grab it out of TLS - performing the scheduler - // action will have given it away. - let sched: ~Scheduler = Local::take(); - - rtdebug!("starting scheduler {}", sched.sched_id()); - sched.run(); - - // Close the idle callback. - let mut sched: ~Scheduler = Local::take(); - sched.idle_callback.take(); - // Make one go through the loop to run the close callback. - sched.run(); - - // Now that we are done with the scheduler, clean up the - // scheduler task. Do so by removing it from TLS and manually - // cleaning up the memory it uses. As we didn't actually call - // task.run() on the scheduler task we never get through all - // the cleanup code it runs. - let mut stask: ~Task = Local::take(); - - rtdebug!("stopping scheduler {}", stask.sched.get_ref().sched_id()); - - // Should not have any messages - let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); - - stask.destroyed = true; - } - - // This does not return a scheduler, as the scheduler is placed - // inside the task. - pub fn run(mut ~self) { - - // This is unsafe because we need to place the scheduler, with - // the event_loop inside, inside our task. But we still need a - // mutable reference to the event_loop to give it the "run" - // command. - unsafe { - let event_loop: *mut ~EventLoop = &mut self.event_loop; - - { - // Our scheduler must be in the task before the event loop - // is started. - let mut stask = Local::borrow(None::<Task>); - stask.get().sched = Some(self); - } - - (*event_loop).run(); - } - } - - // * Execution Functions - Core Loop Logic - - // The model for this function is that you continue through it - // until you either use the scheduler while performing a schedule - // action, in which case you give it away and return early, or - // you reach the end and sleep. In the case that a scheduler - // action is performed the loop is evented such that this function - // is called again. - fn run_sched_once() { - - // When we reach the scheduler context via the event loop we - // already have a scheduler stored in our local task, so we - // start off by taking it. This is the only path through the - // scheduler where we get the scheduler this way. - let mut sched: ~Scheduler = Local::take(); - - // Assume that we need to continue idling unless we reach the - // end of this function without performing an action. - sched.idle_callback.get_mut_ref().resume(); - - // First we check for scheduler messages, these are higher - // priority than regular tasks. - let sched = match sched.interpret_message_queue(DontTryTooHard) { - Some(sched) => sched, - None => return - }; - - // This helper will use a randomized work-stealing algorithm - // to find work. - let sched = match sched.do_work() { - Some(sched) => sched, - None => return - }; - - // Now, before sleeping we need to find out if there really - // were any messages. Give it your best! - let mut sched = match sched.interpret_message_queue(GiveItYourBest) { - Some(sched) => sched, - None => return - }; - - // If we got here then there was no work to do. - // Generate a SchedHandle and push it to the sleeper list so - // somebody can wake us up later. - if !sched.sleepy && !sched.no_sleep { - rtdebug!("scheduler has no work to do, going to sleep"); - sched.sleepy = true; - let handle = sched.make_handle(); - sched.sleeper_list.push(handle); - // Since we are sleeping, deactivate the idle callback. - sched.idle_callback.get_mut_ref().pause(); - } else { - rtdebug!("not sleeping, already doing so or no_sleep set"); - // We may not be sleeping, but we still need to deactivate - // the idle callback. - sched.idle_callback.get_mut_ref().pause(); - } - - // Finished a cycle without using the Scheduler. Place it back - // in TLS. - Local::put(sched); - } - - // This function returns None if the scheduler is "used", or it - // returns the still-available scheduler. At this point all - // message-handling will count as a turn of work, and as a result - // return None. - fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> { - - let msg = if effort == DontTryTooHard { - self.message_queue.casual_pop() - } else { - // When popping our message queue, we could see an "inconsistent" - // state which means that we *should* be able to pop data, but we - // are unable to at this time. Our options are: - // - // 1. Spin waiting for data - // 2. Ignore this and pretend we didn't find a message - // - // If we choose route 1, then if the pusher in question is currently - // pre-empted, we're going to take up our entire time slice just - // spinning on this queue. If we choose route 2, then the pusher in - // question is still guaranteed to make a send() on its async - // handle, so we will guaranteed wake up and see its message at some - // point. - // - // I have chosen to take route #2. - match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None - } - }; - - match msg { - Some(PinnedTask(task)) => { - let mut task = task; - task.give_home(Sched(self.make_handle())); - self.resume_task_immediately(task); - return None; - } - Some(TaskFromFriend(task)) => { - rtdebug!("got a task from a friend. lovely!"); - self.process_task(task, Scheduler::resume_task_immediately_cl); - return None; - } - Some(RunOnce(task)) => { - // bypass the process_task logic to force running this task once - // on this home scheduler. This is often used for I/O (homing). - Scheduler::resume_task_immediately_cl(self, task); - return None; - } - Some(Wake) => { - self.sleepy = false; - Local::put(self); - return None; - } - Some(Shutdown) => { - rtdebug!("shutting down"); - if self.sleepy { - // There may be an outstanding handle on the - // sleeper list. Pop them all to make sure that's - // not the case. - loop { - match self.sleeper_list.pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake); - } - None => break - } - } - } - // No more sleeping. After there are no outstanding - // event loop references we will shut down. - self.no_sleep = true; - self.sleepy = false; - Local::put(self); - return None; - } - None => { - return Some(self); - } - } - } - - fn do_work(mut ~self) -> Option<~Scheduler> { - rtdebug!("scheduler calling do work"); - match self.find_work() { - Some(task) => { - rtdebug!("found some work! processing the task"); - self.process_task(task, Scheduler::resume_task_immediately_cl); - return None; - } - None => { - rtdebug!("no work was found, returning the scheduler struct"); - return Some(self); - } - } - } - - // Workstealing: In this iteration of the runtime each scheduler - // thread has a distinct work queue. When no work is available - // locally, make a few attempts to steal work from the queues of - // other scheduler threads. If a few steals fail we end up in the - // old "no work" path which is fine. - - // First step in the process is to find a task. This function does - // that by first checking the local queue, and if there is no work - // there, trying to steal from the remote work queues. - fn find_work(&mut self) -> Option<~Task> { - rtdebug!("scheduler looking for work"); - if !self.steal_for_yield { - match self.work_queue.pop() { - Some(task) => { - rtdebug!("found a task locally"); - return Some(task) - } - None => { - rtdebug!("scheduler trying to steal"); - return self.try_steals(); - } - } - } else { - // During execution of the last task, it performed a 'yield', - // so we're doing some work stealing in order to introduce some - // scheduling randomness. Otherwise we would just end up popping - // that same task again. This is pretty lame and is to work around - // the problem that work stealing is not designed for 'non-strict' - // (non-fork-join) task parallelism. - self.steal_for_yield = false; - match self.try_steals() { - Some(task) => { - rtdebug!("stole a task after yielding"); - return Some(task); - } - None => { - rtdebug!("did not steal a task after yielding"); - // Back to business - return self.find_work(); - } - } - } - } - - // Try stealing from all queues the scheduler knows about. This - // naive implementation can steal from our own queue or from other - // special schedulers. - fn try_steals(&mut self) -> Option<~Task> { - let work_queues = &mut self.work_queues; - let len = work_queues.len(); - let start_index = self.rng.gen_range(0, len); - for index in range(0, len).map(|i| (i + start_index) % len) { - match work_queues[index].steal() { - deque::Data(task) => { - rtdebug!("found task by stealing"); - return Some(task) - } - _ => () - } - }; - rtdebug!("giving up on stealing"); - return None; - } - - // * Task Routing Functions - Make sure tasks send up in the right - // place. - - fn process_task(mut ~self, mut task: ~Task, schedule_fn: SchedulingFn) { - rtdebug!("processing a task"); - - let home = task.take_unwrap_home(); - match home { - Sched(home_handle) => { - if home_handle.sched_id != self.sched_id() { - rtdebug!("sending task home"); - task.give_home(Sched(home_handle)); - Scheduler::send_task_home(task); - Local::put(self); - } else { - rtdebug!("running task here"); - task.give_home(Sched(home_handle)); - schedule_fn(self, task); - } - } - AnySched if self.run_anything => { - rtdebug!("running anysched task here"); - task.give_home(AnySched); - schedule_fn(self, task); - } - AnySched => { - rtdebug!("sending task to friend"); - task.give_home(AnySched); - self.send_to_friend(task); - Local::put(self); - } - } - } - - fn send_task_home(task: ~Task) { - let mut task = task; - let mut home = task.take_unwrap_home(); - match home { - Sched(ref mut home_handle) => { - home_handle.send(PinnedTask(task)); - } - AnySched => { - rtabort!("error: cannot send anysched task home"); - } - } - } - - /// Take a non-homed task we aren't allowed to run here and send - /// it to the designated friend scheduler to execute. - fn send_to_friend(&mut self, task: ~Task) { - rtdebug!("sending a task to friend"); - match self.friend_handle { - Some(ref mut handle) => { - handle.send(TaskFromFriend(task)); - } - None => { - rtabort!("tried to send task to a friend but scheduler has no friends"); - } - } - } - - /// Schedule a task to be executed later. - /// - /// Pushes the task onto the work stealing queue and tells the - /// event loop to run it later. Always use this instead of pushing - /// to the work queue directly. - pub fn enqueue_task(&mut self, task: ~Task) { - - // We push the task onto our local queue clone. - self.work_queue.push(task); - self.idle_callback.get_mut_ref().resume(); - - // We've made work available. Notify a - // sleeping scheduler. - - match self.sleeper_list.casual_pop() { - Some(handle) => { - let mut handle = handle; - handle.send(Wake) - } - None => { (/* pass */) } - }; - } - - /// As enqueue_task, but with the possibility for the blocked task to - /// already have been killed. - pub fn enqueue_blocked_task(&mut self, blocked_task: BlockedTask) { - blocked_task.wake().map(|task| self.enqueue_task(task)); - } - - // * Core Context Switching Functions - - // The primary function for changing contexts. In the current - // design the scheduler is just a slightly modified GreenTask, so - // all context swaps are from Task to Task. The only difference - // between the various cases is where the inputs come from, and - // what is done with the resulting task. That is specified by the - // cleanup function f, which takes the scheduler and the - // old task as inputs. - - pub fn change_task_context(mut ~self, - next_task: ~Task, - f: |&mut Scheduler, ~Task|) { - // The current task is grabbed from TLS, not taken as an input. - // Doing an unsafe_take to avoid writing back a null pointer - - // We're going to call `put` later to do that. - let current_task: ~Task = unsafe { Local::unsafe_take() }; - - // Check that the task is not in an atomically() section (e.g., - // holding a pthread mutex, which could deadlock the scheduler). - current_task.death.assert_may_sleep(); - - // These transmutes do something fishy with a closure. - let f_fake_region = unsafe { - transmute::<|&mut Scheduler, ~Task|, - |&mut Scheduler, ~Task|>(f) - }; - let f_opaque = ClosureConverter::from_fn(f_fake_region); - - // The current task is placed inside an enum with the cleanup - // function. This enum is then placed inside the scheduler. - self.cleanup_job = Some(CleanupJob::new(current_task, f_opaque)); - - // The scheduler is then placed inside the next task. - let mut next_task = next_task; - next_task.sched = Some(self); - - // However we still need an internal mutable pointer to the - // original task. The strategy here was "arrange memory, then - // get pointers", so we crawl back up the chain using - // transmute to eliminate borrowck errors. - unsafe { - - let sched: &mut Scheduler = - transmute_mut_region(*next_task.sched.get_mut_ref()); - - let current_task: &mut Task = match sched.cleanup_job { - Some(CleanupJob { task: ref task, .. }) => { - let task_ptr: *~Task = task; - transmute_mut_region(*transmute_mut_unsafe(task_ptr)) - } - None => { - rtabort!("no cleanup job"); - } - }; - - let (current_task_context, next_task_context) = - Scheduler::get_contexts(current_task, next_task); - - // Done with everything - put the next task in TLS. This - // works because due to transmute the borrow checker - // believes that we have no internal pointers to - // next_task. - Local::put(next_task); - - // The raw context swap operation. The next action taken - // will be running the cleanup job from the context of the - // next task. - Context::swap(current_task_context, next_task_context); - } - - // When the context swaps back to this task we immediately - // run the cleanup job, as expected by the previously called - // swap_contexts function. - unsafe { - let task: *mut Task = Local::unsafe_borrow(); - (*task).sched.get_mut_ref().run_cleanup_job(); - - // See the comments in switch_running_tasks_and_then for why a lock - // is acquired here. This is the resumption points and the "bounce" - // that it is referring to. - (*task).nasty_deschedule_lock.lock(); - (*task).nasty_deschedule_lock.unlock(); - } - } - - // Returns a mutable reference to both contexts involved in this - // swap. This is unsafe - we are getting mutable internal - // references to keep even when we don't own the tasks. It looks - // kinda safe because we are doing transmutes before passing in - // the arguments. - pub fn get_contexts<'a>(current_task: &mut Task, next_task: &mut Task) -> - (&'a mut Context, &'a mut Context) { - let current_task_context = - &mut current_task.coroutine.get_mut_ref().saved_context; - let next_task_context = - &mut next_task.coroutine.get_mut_ref().saved_context; - unsafe { - (transmute_mut_region(current_task_context), - transmute_mut_region(next_task_context)) - } - } - - // * Context Swapping Helpers - Here be ugliness! - - pub fn resume_task_immediately(~self, task: ~Task) { - self.change_task_context(task, |sched, stask| { - sched.sched_task = Some(stask); - }) - } - - fn resume_task_immediately_cl(sched: ~Scheduler, - task: ~Task) { - sched.resume_task_immediately(task) - } - - - pub fn resume_blocked_task_immediately(~self, blocked_task: BlockedTask) { - match blocked_task.wake() { - Some(task) => { self.resume_task_immediately(task); } - None => Local::put(self) - }; - } - - /// Block a running task, context switch to the scheduler, then pass the - /// blocked task to a closure. - /// - /// # Safety note - /// - /// The closure here is a *stack* closure that lives in the - /// running task. It gets transmuted to the scheduler's lifetime - /// and called while the task is blocked. - /// - /// This passes a Scheduler pointer to the fn after the context switch - /// in order to prevent that fn from performing further scheduling operations. - /// Doing further scheduling could easily result in infinite recursion. - /// - /// Note that if the closure provided relinquishes ownership of the - /// BlockedTask, then it is possible for the task to resume execution before - /// the closure has finished executing. This would naturally introduce a - /// race if the closure and task shared portions of the environment. - /// - /// This situation is currently prevented, or in other words it is - /// guaranteed that this function will not return before the given closure - /// has returned. - pub fn deschedule_running_task_and_then(mut ~self, - f: |&mut Scheduler, BlockedTask|) { - // Trickier - we need to get the scheduler task out of self - // and use it as the destination. - let stask = self.sched_task.take_unwrap(); - // Otherwise this is the same as below. - self.switch_running_tasks_and_then(stask, f); - } - - pub fn switch_running_tasks_and_then(~self, next_task: ~Task, - f: |&mut Scheduler, BlockedTask|) { - // And here comes one of the sad moments in which a lock is used in a - // core portion of the rust runtime. As always, this is highly - // undesirable, so there's a good reason behind it. - // - // There is an excellent outline of the problem in issue #8132, and it's - // summarized in that `f` is executed on a sched task, but its - // environment is on the previous task. If `f` relinquishes ownership of - // the BlockedTask, then it may introduce a race where `f` is using the - // environment as well as the code after the 'deschedule' block. - // - // The solution we have chosen to adopt for now is to acquire a - // task-local lock around this block. The resumption of the task in - // context switching will bounce on the lock, thereby waiting for this - // block to finish, eliminating the race mentioned above. - // - // To actually maintain a handle to the lock, we use an unsafe pointer - // to it, but we're guaranteed that the task won't exit until we've - // unlocked the lock so there's no worry of this memory going away. - self.change_task_context(next_task, |sched, mut task| { - let lock: *mut Mutex = &mut task.nasty_deschedule_lock; - unsafe { (*lock).lock() } - f(sched, BlockedTask::block(task)); - unsafe { (*lock).unlock() } - }) - } - - fn switch_task(sched: ~Scheduler, task: ~Task) { - sched.switch_running_tasks_and_then(task, |sched, last_task| { - sched.enqueue_blocked_task(last_task); - }); - } - - // * Task Context Helpers - - /// Called by a running task to end execution, after which it will - /// be recycled by the scheduler for reuse in a new task. - pub fn terminate_current_task(mut ~self) { - // Similar to deschedule running task and then, but cannot go through - // the task-blocking path. The task is already dying. - let stask = self.sched_task.take_unwrap(); - self.change_task_context(stask, |sched, mut dead_task| { - let coroutine = dead_task.coroutine.take_unwrap(); - coroutine.recycle(&mut sched.stack_pool); - }) - } - - pub fn run_task(task: ~Task) { - let sched: ~Scheduler = Local::take(); - sched.process_task(task, Scheduler::switch_task); - } - - pub fn run_task_later(next_task: ~Task) { - let mut sched = Local::borrow(None::<Scheduler>); - sched.get().enqueue_task(next_task); - } - - /// Yield control to the scheduler, executing another task. This is guaranteed - /// to introduce some amount of randomness to the scheduler. Currently the - /// randomness is a result of performing a round of work stealing (which - /// may end up stealing from the current scheduler). - pub fn yield_now(mut ~self) { - self.yield_check_count = reset_yield_check(&mut self.rng); - // Tell the scheduler to start stealing on the next iteration - self.steal_for_yield = true; - self.deschedule_running_task_and_then(|sched, task| { - sched.enqueue_blocked_task(task); - }) - } - - pub fn maybe_yield(mut ~self) { - // The number of times to do the yield check before yielding, chosen arbitrarily. - rtassert!(self.yield_check_count > 0); - self.yield_check_count -= 1; - if self.yield_check_count == 0 { - self.yield_now(); - } else { - Local::put(self); - } - } - - - // * Utility Functions - - pub fn sched_id(&self) -> uint { to_uint(self) } - - pub fn run_cleanup_job(&mut self) { - let cleanup_job = self.cleanup_job.take_unwrap(); - cleanup_job.run(self); - } - - pub fn make_handle(&mut self) -> SchedHandle { - let remote = self.event_loop.remote_callback(~SchedRunner as ~Callback); - - return SchedHandle { - remote: remote, - queue: self.message_producer.clone(), - sched_id: self.sched_id() - }; - } -} - -// Supporting types - -type SchedulingFn = extern "Rust" fn (~Scheduler, ~Task); - -pub enum SchedMessage { - Wake, - Shutdown, - PinnedTask(~Task), - TaskFromFriend(~Task), - RunOnce(~Task), -} - -pub struct SchedHandle { - priv remote: ~RemoteCallback, - priv queue: mpsc::Producer<SchedMessage, ()>, - sched_id: uint -} - -impl SchedHandle { - pub fn send(&mut self, msg: SchedMessage) { - self.queue.push(msg); - self.remote.fire(); - } -} - -struct SchedRunner; - -impl Callback for SchedRunner { - fn call(&mut self) { - Scheduler::run_sched_once(); - } -} - -struct CleanupJob { - task: ~Task, - f: UnsafeTaskReceiver -} - -impl CleanupJob { - pub fn new(task: ~Task, f: UnsafeTaskReceiver) -> CleanupJob { - CleanupJob { - task: task, - f: f - } - } - - pub fn run(self, sched: &mut Scheduler) { - let CleanupJob { task: task, f: f } = self; - f.to_fn()(sched, task) - } -} - -// XXX: Some hacks to put a || closure in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = raw::Closure; -trait ClosureConverter { - fn from_fn(|&mut Scheduler, ~Task|) -> Self; - fn to_fn(self) -> |&mut Scheduler, ~Task|; -} -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: |&mut Scheduler, ~Task|) -> UnsafeTaskReceiver { - unsafe { transmute(f) } - } - fn to_fn(self) -> |&mut Scheduler, ~Task| { unsafe { transmute(self) } } -} - -// On unix, we read randomness straight from /dev/urandom, but the -// default constructor of an XorShiftRng does this via io::fs, which -// relies on the scheduler existing, so we have to manually load -// randomness. Windows has its own C API for this, so we don't need to -// worry there. -#[cfg(windows)] -fn new_sched_rng() -> XorShiftRng { - XorShiftRng::new() -} -#[cfg(unix)] -fn new_sched_rng() -> XorShiftRng { - use libc; - use mem; - use c_str::ToCStr; - use vec::MutableVector; - use iter::Iterator; - use rand::SeedableRng; - - let fd = "/dev/urandom".with_c_str(|name| { - unsafe { libc::open(name, libc::O_RDONLY, 0) } - }); - if fd == -1 { - rtabort!("could not open /dev/urandom for reading.") - } - - let mut seeds = [0u32, .. 4]; - let size = mem::size_of_val(&seeds); - loop { - let nbytes = unsafe { - libc::read(fd, - seeds.as_mut_ptr() as *mut libc::c_void, - size as libc::size_t) - }; - rtassert!(nbytes as uint == size); - - if !seeds.iter().all(|x| *x == 0) { - break; - } - } - - unsafe {libc::close(fd);} - - SeedableRng::from_seed(seeds) -} - -#[cfg(test)] -mod test { - use prelude::*; - - use borrow::to_uint; - use rt::deque::BufferPool; - use rt::basic; - use rt::sched::{Scheduler}; - use rt::task::{Task, Sched}; - use rt::test::*; - use rt::thread::Thread; - use rt::util; - use task::TaskResult; - use unstable::run_in_bare_thread; - - #[test] - fn trivial_run_in_newsched_task_test() { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - do run_in_newsched_task || { - unsafe { *task_ran_ptr = true }; - rtdebug!("executed from the new scheduler") - } - assert!(task_ran); - } - - #[test] - fn multiple_task_test() { - let total = 10; - let mut task_run_count = 0; - let task_run_count_ptr: *mut uint = &mut task_run_count; - do run_in_newsched_task || { - for _ in range(0u, total) { - do spawntask || { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1}; - } - } - } - assert!(task_run_count == total); - } - - #[test] - fn multiple_task_nested_test() { - let mut task_run_count = 0; - let task_run_count_ptr: *mut uint = &mut task_run_count; - do run_in_newsched_task || { - do spawntask || { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - do spawntask || { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - do spawntask || { - unsafe { *task_run_count_ptr = *task_run_count_ptr + 1 }; - } - } - } - } - assert!(task_run_count == 3); - } - - // Confirm that a sched_id actually is the uint form of the - // pointer to the scheduler struct. - #[test] - fn simple_sched_id_test() { - do run_in_bare_thread { - let sched = ~new_test_uv_sched(); - assert!(to_uint(sched) == sched.sched_id()); - } - } - - // Compare two scheduler ids that are different, this should never - // fail but may catch a mistake someday. - #[test] - fn compare_sched_id_test() { - do run_in_bare_thread { - let sched_one = ~new_test_uv_sched(); - let sched_two = ~new_test_uv_sched(); - assert!(sched_one.sched_id() != sched_two.sched_id()); - } - } - - - // A very simple test that confirms that a task executing on the - // home scheduler notices that it is home. - #[test] - fn test_home_sched() { - do run_in_bare_thread { - let mut task_ran = false; - let task_ran_ptr: *mut bool = &mut task_ran; - - let mut sched = ~new_test_uv_sched(); - let sched_handle = sched.make_handle(); - - let mut task = ~do Task::new_root_homed(&mut sched.stack_pool, None, - Sched(sched_handle)) { - unsafe { *task_ran_ptr = true }; - assert!(Task::on_appropriate_sched()); - }; - - let on_exit: proc(TaskResult) = proc(exit_status) { - rtassert!(exit_status.is_ok()) - }; - task.death.on_exit = Some(on_exit); - - sched.bootstrap(task); - } - } - - // An advanced test that checks all four possible states that a - // (task,sched) can be in regarding homes. - - #[test] - fn test_schedule_home_states() { - use rt::sleeper_list::SleeperList; - use rt::sched::Shutdown; - use borrow; - - do run_in_bare_thread { - - let sleepers = SleeperList::new(); - let mut pool = BufferPool::new(); - let (normal_worker, normal_stealer) = pool.deque(); - let (special_worker, special_stealer) = pool.deque(); - let queues = ~[normal_stealer, special_stealer]; - - // Our normal scheduler - let mut normal_sched = ~Scheduler::new( - basic::event_loop(), - normal_worker, - queues.clone(), - sleepers.clone()); - - let normal_handle = normal_sched.make_handle(); - - let friend_handle = normal_sched.make_handle(); - - // Our special scheduler - let mut special_sched = ~Scheduler::new_special( - basic::event_loop(), - special_worker, - queues.clone(), - sleepers.clone(), - false, - Some(friend_handle)); - - let special_handle = special_sched.make_handle(); - - let t1_handle = special_sched.make_handle(); - let t4_handle = special_sched.make_handle(); - - // Four test tasks: - // 1) task is home on special - // 2) task not homed, sched doesn't care - // 3) task not homed, sched requeues - // 4) task not home, send home - - let task1 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, - Sched(t1_handle)) || { - rtassert!(Task::on_appropriate_sched()); - }; - rtdebug!("task1 id: **{}**", borrow::to_uint(task1)); - - let task2 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { - rtassert!(Task::on_appropriate_sched()); - }; - - let task3 = ~do Task::new_root(&mut normal_sched.stack_pool, None) { - rtassert!(Task::on_appropriate_sched()); - }; - - let task4 = ~do Task::new_root_homed(&mut special_sched.stack_pool, None, - Sched(t4_handle)) { - rtassert!(Task::on_appropriate_sched()); - }; - rtdebug!("task4 id: **{}**", borrow::to_uint(task4)); - - // Signal from the special task that we are done. - let (port, chan) = Chan::<()>::new(); - - let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) { - rtdebug!("*about to submit task2*"); - Scheduler::run_task(task2); - rtdebug!("*about to submit task4*"); - Scheduler::run_task(task4); - rtdebug!("*normal_task done*"); - port.recv(); - let mut nh = normal_handle; - nh.send(Shutdown); - let mut sh = special_handle; - sh.send(Shutdown); - }; - - rtdebug!("normal task: {}", borrow::to_uint(normal_task)); - - let special_task = ~do Task::new_root(&mut special_sched.stack_pool, None) { - rtdebug!("*about to submit task1*"); - Scheduler::run_task(task1); - rtdebug!("*about to submit task3*"); - Scheduler::run_task(task3); - rtdebug!("*done with special_task*"); - chan.send(()); - }; - - rtdebug!("special task: {}", borrow::to_uint(special_task)); - - let normal_sched = normal_sched; - let normal_thread = do Thread::start { - normal_sched.bootstrap(normal_task); - rtdebug!("finished with normal_thread"); - }; - - let special_sched = special_sched; - let special_thread = do Thread::start { - special_sched.bootstrap(special_task); - rtdebug!("finished with special_sched"); - }; - - normal_thread.join(); - special_thread.join(); - } - } - - #[test] - fn test_stress_schedule_task_states() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - let n = stress_factor() * 120; - for _ in range(0, n as int) { - test_schedule_home_states(); - } - } - - #[test] - fn test_io_callback() { - use io::timer; - - // This is a regression test that when there are no schedulable tasks - // in the work queue, but we are performing I/O, that once we do put - // something in the work queue again the scheduler picks it up and doesn't - // exit before emptying the work queue - do run_in_uv_task { - do spawntask { - timer::sleep(10); - } - } - } - - #[test] - fn handle() { - do run_in_bare_thread { - let (port, chan) = Chan::new(); - - let thread_one = do Thread::start { - let chan = chan; - do run_in_newsched_task_core { - chan.send(()); - } - }; - - let thread_two = do Thread::start { - let port = port; - do run_in_newsched_task_core { - port.recv(); - } - }; - - thread_two.join(); - thread_one.join(); - } - } - - // A regression test that the final message is always handled. - // Used to deadlock because Shutdown was never recvd. - #[test] - fn no_missed_messages() { - use rt::sleeper_list::SleeperList; - use rt::stack::StackPool; - use rt::sched::{Shutdown, TaskFromFriend}; - - do run_in_bare_thread { - stress_factor().times(|| { - let sleepers = SleeperList::new(); - let mut pool = BufferPool::new(); - let (worker, stealer) = pool.deque(); - - let mut sched = ~Scheduler::new( - basic::event_loop(), - worker, - ~[stealer], - sleepers.clone()); - - let mut handle = sched.make_handle(); - - let sched = sched; - let thread = do Thread::start { - let mut sched = sched; - let bootstrap_task = - ~Task::new_root(&mut sched.stack_pool, - None, - proc()()); - sched.bootstrap(bootstrap_task); - }; - - let mut stack_pool = StackPool::new(); - let task = ~Task::new_root(&mut stack_pool, None, proc()()); - handle.send(TaskFromFriend(task)); - - handle.send(Shutdown); - drop(handle); - - thread.join(); - }) - } - } - - #[test] - fn multithreading() { - use num::Times; - use vec::OwnedVector; - use container::Container; - - do run_in_mt_newsched_task { - let mut ports = ~[]; - 10.times(|| { - let (port, chan) = Chan::new(); - do spawntask_later { - chan.send(()); - } - ports.push(port); - }); - - while !ports.is_empty() { - ports.pop().recv(); - } - } - } - - #[test] - fn thread_ring() { - do run_in_mt_newsched_task { - let (end_port, end_chan) = Chan::new(); - - let n_tasks = 10; - let token = 2000; - - let (mut p, ch1) = Chan::new(); - ch1.send((token, end_chan)); - let mut i = 2; - while i <= n_tasks { - let (next_p, ch) = Chan::new(); - let imm_i = i; - let imm_p = p; - do spawntask_random { - roundtrip(imm_i, n_tasks, &imm_p, &ch); - }; - p = next_p; - i += 1; - } - let p = p; - do spawntask_random { - roundtrip(1, n_tasks, &p, &ch1); - } - - end_port.recv(); - } - - fn roundtrip(id: int, n_tasks: int, - p: &Port<(int, Chan<()>)>, - ch: &Chan<(int, Chan<()>)>) { - while (true) { - match p.recv() { - (1, end_chan) => { - debug!("{}\n", id); - end_chan.send(()); - return; - } - (token, end_chan) => { - debug!("thread: {} got token: {}", id, token); - ch.send((token - 1, end_chan)); - if token <= n_tasks { - return; - } - } - } - } - } - } - - #[test] - fn start_closure_dtor() { - use ops::Drop; - - // Regression test that the `start` task entrypoint can - // contain dtors that use task resources - do run_in_newsched_task { - struct S { field: () } - - impl Drop for S { - fn drop(&mut self) { - let _foo = @0; - } - } - - let s = S { field: () }; - - do spawntask { - let _ss = &s; - } - } - } - - // FIXME: #9407: xfail-test - #[ignore] - #[test] - fn dont_starve_1() { - stress_factor().times(|| { - do run_in_mt_newsched_task { - let (port, chan) = Chan::new(); - - // This task should not be able to starve the sender; - // The sender should get stolen to another thread. - do spawntask { - while port.try_recv().is_none() { } - } - - chan.send(()); - } - }) - } - - #[test] - fn dont_starve_2() { - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = Chan::new(); - let (_port2, chan2) = Chan::new(); - - // This task should not be able to starve the other task. - // The sends should eventually yield. - do spawntask { - while port.try_recv().is_none() { - chan2.send(()); - } - } - - chan.send(()); - } - }) - } - - // Regression test for a logic bug that would cause single-threaded schedulers - // to sleep forever after yielding and stealing another task. - #[test] - fn single_threaded_yield() { - use task::{spawn, spawn_sched, SingleThreaded, deschedule}; - use num::Times; - - do spawn_sched(SingleThreaded) { - 5.times(|| { deschedule(); }) - } - do spawn { } - do spawn { } - } -} diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs deleted file mode 100644 index 39c7431837f..00000000000 --- a/src/libstd/rt/sleeper_list.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Maintains a shared list of sleeping schedulers. Schedulers -//! use this to wake each other up. - -use rt::sched::SchedHandle; -use rt::mpmc_bounded_queue::Queue; -use option::*; -use clone::Clone; - -pub struct SleeperList { - priv q: Queue<SchedHandle>, -} - -impl SleeperList { - pub fn new() -> SleeperList { - SleeperList{q: Queue::with_capacity(8*1024)} - } - - pub fn push(&mut self, value: SchedHandle) { - assert!(self.q.push(value)) - } - - pub fn pop(&mut self) -> Option<SchedHandle> { - self.q.pop() - } - - pub fn casual_pop(&mut self) -> Option<SchedHandle> { - self.q.pop() - } -} - -impl Clone for SleeperList { - fn clone(&self) -> SleeperList { - SleeperList { - q: self.q.clone() - } - } -} diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/rt/spsc_queue.rs deleted file mode 100644 index f14533d726a..00000000000 --- a/src/libstd/rt/spsc_queue.rs +++ /dev/null @@ -1,296 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue -use cast; -use kinds::Send; -use ops::Drop; -use option::{Some, None, Option}; -use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use unstable::sync::UnsafeArc; - -// Node within the linked list queue of messages to send -struct Node<T> { - // XXX: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State<T, P> { - // consumer fields - tail: *mut Node<T>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -pub fn queue<T: Send, P: Send>(bound: uint, - p: P) -> (Consumer<T, P>, Producer<T, P>) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) -} - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - cast::transmute(~Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send, P: Send> Producer<T, P> { - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> State<T, P> { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; - } - - unsafe fn alloc(&mut self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - self.tail_copy = self.tail_prev.load(Acquire); - if self.first != self.tail_copy { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = self.first; - self.first = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option<T> { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // XXX: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node<T> = cast::transmute(tail); - } - } - return ret; - } - - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { - unsafe { - let mut cur = self.first; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: ~Node<T> = cast::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - use super::queue; - use task; - - #[test] - fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); - } - - #[test] - fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); - } - - #[test] - fn stress() { - stress_bound(0); - stress_bound(1); - - fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); - do task::spawn_sched(task::SingleThreaded) { - let mut c = c; - for _ in range(0, 100000) { - loop { - match c.pop() { - Some(1) => break, - Some(_) => fail!(), - None => {} - } - } - } - } - for _ in range(0, 100000) { - p.push(1); - } - } - } -} diff --git a/src/libstd/rt/stack.rs b/src/libstd/rt/stack.rs deleted file mode 100644 index 44b60e955d2..00000000000 --- a/src/libstd/rt/stack.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use container::Container; -use ptr::RawPtr; -use vec; -use ops::Drop; -use libc::{c_uint, uintptr_t}; - -pub struct StackSegment { - priv buf: ~[u8], - priv valgrind_id: c_uint -} - -impl StackSegment { - pub fn new(size: uint) -> StackSegment { - unsafe { - // Crate a block of uninitialized values - let mut stack = vec::with_capacity(size); - stack.set_len(size); - - let mut stk = StackSegment { - buf: stack, - valgrind_id: 0 - }; - - // XXX: Using the FFI to call a C macro. Slow - stk.valgrind_id = rust_valgrind_stack_register(stk.start(), stk.end()); - return stk; - } - } - - /// Point to the low end of the allocated stack - pub fn start(&self) -> *uint { - self.buf.as_ptr() as *uint - } - - /// Point one word beyond the high end of the allocated stack - pub fn end(&self) -> *uint { - unsafe { - self.buf.as_ptr().offset(self.buf.len() as int) as *uint - } - } -} - -impl Drop for StackSegment { - fn drop(&mut self) { - unsafe { - // XXX: Using the FFI to call a C macro. Slow - rust_valgrind_stack_deregister(self.valgrind_id); - } - } -} - -pub struct StackPool(()); - -impl StackPool { - pub fn new() -> StackPool { StackPool(()) } - - fn take_segment(&self, min_size: uint) -> StackSegment { - StackSegment::new(min_size) - } - - fn give_segment(&self, _stack: StackSegment) { - } -} - -extern { - fn rust_valgrind_stack_register(start: *uintptr_t, end: *uintptr_t) -> c_uint; - fn rust_valgrind_stack_deregister(id: c_uint); -} diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 30e05e9091f..e6ab159a769 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -13,29 +13,41 @@ //! local storage, and logging. Even a 'freestanding' Rust would likely want //! to implement this. -use super::local_heap::LocalHeap; - -use prelude::*; - +use any::AnyOwnExt; use borrow; +use cast; use cleanup; use io::Writer; -use libc::{c_char, size_t}; +use iter::{Iterator, Take}; use local_data; +use ops::Drop; use option::{Option, Some, None}; +use prelude::drop; +use result::{Result, Ok, Err}; +use rt::Runtime; use rt::borrowck::BorrowRecord; use rt::borrowck; -use rt::context::Context; -use rt::env; -use rt::kill::Death; use rt::local::Local; +use rt::local_heap::LocalHeap; use rt::logging::StdErrLogger; -use rt::sched::{Scheduler, SchedHandle}; -use rt::stack::{StackSegment, StackPool}; +use rt::rtio::LocalIo; use rt::unwind::Unwinder; use send_str::SendStr; +use sync::arc::UnsafeArc; +use sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; +use task::{TaskResult, TaskOpts}; use unstable::finally::Finally; -use unstable::mutex::Mutex; +use unstable::mutex::{Mutex, MUTEX_INIT}; + +#[cfg(stage0)] +pub use rt::unwind::begin_unwind; + +// These two statics are used as bookeeping to keep track of the rust runtime's +// count of threads. In 1:1 contexts, this is used to know when to return from +// the main function, and in M:N contexts this is used to know when to shut down +// the pool of schedulers. +static mut TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; +static mut TASK_LOCK: Mutex = MUTEX_INIT; // The Task struct represents all state associated with a rust // task. There are at this point two primary "subtypes" of task, @@ -45,201 +57,90 @@ use unstable::mutex::Mutex; pub struct Task { heap: LocalHeap, - priv gc: GarbageCollector, + gc: GarbageCollector, storage: LocalStorage, - logger: Option<StdErrLogger>, unwinder: Unwinder, death: Death, destroyed: bool, name: Option<SendStr>, - coroutine: Option<Coroutine>, - sched: Option<~Scheduler>, - task_type: TaskType, // Dynamic borrowck debugging info borrow_list: Option<~[BorrowRecord]>, + + logger: Option<StdErrLogger>, stdout_handle: Option<~Writer>, - // See the comments in the scheduler about why this is necessary - nasty_deschedule_lock: Mutex, + priv imp: Option<~Runtime>, } -pub enum TaskType { - GreenTask(Option<SchedHome>), - SchedTask -} +pub struct GarbageCollector; +pub struct LocalStorage(Option<local_data::Map>); -/// A coroutine is nothing more than a (register context, stack) pair. -pub struct Coroutine { - /// The segment of stack on which the task is currently running or - /// if the task is blocked, on which the task will resume - /// execution. - /// - /// Servo needs this to be public in order to tell SpiderMonkey - /// about the stack bounds. - current_stack_segment: StackSegment, - /// Always valid if the task is alive and not running. - saved_context: Context +/// A handle to a blocked task. Usually this means having the ~Task pointer by +/// ownership, but if the task is killable, a killer can steal it at any time. +pub enum BlockedTask { + Owned(~Task), + Shared(UnsafeArc<AtomicUint>), } -/// Some tasks have a dedicated home scheduler that they must run on. -pub enum SchedHome { - AnySched, - Sched(SchedHandle) +/// Per-task state related to task death, killing, failure, etc. +pub struct Death { + // Action to be done with the exit code. If set, also makes the task wait + // until all its watched children exit before collecting the status. + on_exit: Option<proc(TaskResult)>, } -pub struct GarbageCollector; -pub struct LocalStorage(Option<local_data::Map>); +pub struct BlockedTaskIterator { + priv inner: UnsafeArc<AtomicUint>, +} impl Task { - - // A helper to build a new task using the dynamically found - // scheduler and task. Only works in GreenTask context. - pub fn build_homed_child(stack_size: Option<uint>, - f: proc(), - home: SchedHome) - -> ~Task { - let mut running_task = Local::borrow(None::<Task>); - let mut sched = running_task.get().sched.take_unwrap(); - let new_task = ~running_task.get() - .new_child_homed(&mut sched.stack_pool, - stack_size, - home, - f); - running_task.get().sched = Some(sched); - new_task - } - - pub fn build_child(stack_size: Option<uint>, f: proc()) -> ~Task { - Task::build_homed_child(stack_size, f, AnySched) - } - - pub fn build_homed_root(stack_size: Option<uint>, - f: proc(), - home: SchedHome) - -> ~Task { - let mut running_task = Local::borrow(None::<Task>); - let mut sched = running_task.get().sched.take_unwrap(); - let new_task = ~Task::new_root_homed(&mut sched.stack_pool, - stack_size, - home, - f); - running_task.get().sched = Some(sched); - new_task - } - - pub fn build_root(stack_size: Option<uint>, f: proc()) -> ~Task { - Task::build_homed_root(stack_size, f, AnySched) - } - - pub fn new_sched_task() -> Task { + pub fn new() -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(None), - logger: None, - unwinder: Unwinder { unwinding: false, cause: None }, + unwinder: Unwinder::new(), death: Death::new(), destroyed: false, - coroutine: Some(Coroutine::empty()), name: None, - sched: None, - task_type: SchedTask, borrow_list: None, - stdout_handle: None, - nasty_deschedule_lock: unsafe { Mutex::new() }, - } - } - - pub fn new_root(stack_pool: &mut StackPool, - stack_size: Option<uint>, - start: proc()) -> Task { - Task::new_root_homed(stack_pool, stack_size, AnySched, start) - } - - pub fn new_child(&mut self, - stack_pool: &mut StackPool, - stack_size: Option<uint>, - start: proc()) -> Task { - self.new_child_homed(stack_pool, stack_size, AnySched, start) - } - - pub fn new_root_homed(stack_pool: &mut StackPool, - stack_size: Option<uint>, - home: SchedHome, - start: proc()) -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(None), logger: None, - unwinder: Unwinder { unwinding: false, cause: None }, - death: Death::new(), - destroyed: false, - name: None, - coroutine: Some(Coroutine::new(stack_pool, stack_size, start)), - sched: None, - task_type: GreenTask(Some(home)), - borrow_list: None, stdout_handle: None, - nasty_deschedule_lock: unsafe { Mutex::new() }, + imp: None, } } - pub fn new_child_homed(&mut self, - stack_pool: &mut StackPool, - stack_size: Option<uint>, - home: SchedHome, - start: proc()) -> Task { - Task { - heap: LocalHeap::new(), - gc: GarbageCollector, - storage: LocalStorage(None), - logger: None, - unwinder: Unwinder { unwinding: false, cause: None }, - death: Death::new(), - destroyed: false, - name: None, - coroutine: Some(Coroutine::new(stack_pool, stack_size, start)), - sched: None, - task_type: GreenTask(Some(home)), - borrow_list: None, - stdout_handle: None, - nasty_deschedule_lock: unsafe { Mutex::new() }, - } - } - - pub fn give_home(&mut self, new_home: SchedHome) { - match self.task_type { - GreenTask(ref mut home) => { - *home = Some(new_home); - } - SchedTask => { - rtabort!("type error: used SchedTask as GreenTask"); - } - } - } - - pub fn take_unwrap_home(&mut self) -> SchedHome { - match self.task_type { - GreenTask(ref mut home) => { - let out = home.take_unwrap(); - return out; - } - SchedTask => { - rtabort!("type error: used SchedTask as GreenTask"); - } - } - } - - pub fn run(&mut self, f: ||) { - rtdebug!("run called on task: {}", borrow::to_uint(self)); + /// Executes the given closure as if it's running inside this task. The task + /// is consumed upon entry, and the destroyed task is returned from this + /// function in order for the caller to free. This function is guaranteed to + /// not unwind because the closure specified is run inside of a `rust_try` + /// block. (this is the only try/catch block in the world). + /// + /// This function is *not* meant to be abused as a "try/catch" block. This + /// is meant to be used at the absolute boundaries of a task's lifetime, and + /// only for that purpose. + pub fn run(~self, f: ||) -> ~Task { + // Need to put ourselves into TLS, but also need access to the unwinder. + // Unsafely get a handle to the task so we can continue to use it after + // putting it in tls (so we can invoke the unwinder). + let handle: *mut Task = unsafe { + *cast::transmute::<&~Task, &*mut Task>(&self) + }; + Local::put(self); + unsafe { TASK_COUNT.fetch_add(1, SeqCst); } // The only try/catch block in the world. Attempt to run the task's // client-specified code and catch any failures. - self.unwinder.try(|| { + let try_block = || { // Run the task main function, then do some cleanup. f.finally(|| { + fn flush(w: Option<~Writer>) { + match w { + Some(mut w) => { w.flush(); } + None => {} + } + } // First, destroy task-local storage. This may run user dtors. // @@ -260,7 +161,10 @@ impl Task { // TLS, or possibly some destructors for those objects being // annihilated invoke TLS. Sadly these two operations seemed to // be intertwined, and miraculously work for now... - self.storage.take(); + let mut task = Local::borrow(None::<Task>); + let storage = task.get().storage.take(); + drop(task); + drop(storage); // Destroy remaining boxes. Also may run user dtors. unsafe { cleanup::annihilate(); } @@ -268,77 +172,141 @@ impl Task { // Finally flush and destroy any output handles which the task // owns. There are no boxes here, and no user destructors should // run after this any more. - match self.stdout_handle.take() { - Some(handle) => { - let mut handle = handle; - handle.flush(); - } - None => {} - } - self.logger.take(); + let mut task = Local::borrow(None::<Task>); + let stdout = task.get().stdout_handle.take(); + let logger = task.get().logger.take(); + drop(task); + + flush(stdout); + drop(logger); }) - }); + }; + + unsafe { (*handle).unwinder.try(try_block); } // Cleanup the dynamic borrowck debugging info borrowck::clear_task_borrow_list(); - self.death.collect_failure(self.unwinder.result()); - self.destroyed = true; + // Here we must unsafely borrow the task in order to not remove it from + // TLS. When collecting failure, we may attempt to send on a channel (or + // just run aribitrary code), so we must be sure to still have a local + // task in TLS. + unsafe { + let me: *mut Task = Local::unsafe_borrow(); + (*me).death.collect_failure((*me).unwinder.result()); + + // see comments on these statics for why they're used + if TASK_COUNT.fetch_sub(1, SeqCst) == 1 { + TASK_LOCK.lock(); + TASK_LOCK.signal(); + TASK_LOCK.unlock(); + } + } + let mut me: ~Task = Local::take(); + me.destroyed = true; + return me; } - // New utility functions for homes. + /// Inserts a runtime object into this task, transferring ownership to the + /// task. It is illegal to replace a previous runtime object in this task + /// with this argument. + pub fn put_runtime(&mut self, ops: ~Runtime) { + assert!(self.imp.is_none()); + self.imp = Some(ops); + } - pub fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { - match self.task_type { - GreenTask(Some(AnySched)) => { false } - GreenTask(Some(Sched(SchedHandle { sched_id: ref id, .. }))) => { - *id == sched.sched_id() - } - GreenTask(None) => { - rtabort!("task without home"); - } - SchedTask => { - // Awe yea - rtabort!("type error: expected: GreenTask, found: SchedTask"); + /// Attempts to extract the runtime as a specific type. If the runtime does + /// not have the provided type, then the runtime is not removed. If the + /// runtime does have the specified type, then it is removed and returned + /// (transfer of ownership). + /// + /// It is recommended to only use this method when *absolutely necessary*. + /// This function may not be available in the future. + pub fn maybe_take_runtime<T: 'static>(&mut self) -> Option<~T> { + // This is a terrible, terrible function. The general idea here is to + // take the runtime, cast it to ~Any, check if it has the right type, + // and then re-cast it back if necessary. The method of doing this is + // pretty sketchy and involves shuffling vtables of trait objects + // around, but it gets the job done. + // + // XXX: This function is a serious code smell and should be avoided at + // all costs. I have yet to think of a method to avoid this + // function, and I would be saddened if more usage of the function + // crops up. + unsafe { + let imp = self.imp.take_unwrap(); + let &(vtable, _): &(uint, uint) = cast::transmute(&imp); + match imp.wrap().move::<T>() { + Ok(t) => Some(t), + Err(t) => { + let (_, obj): (uint, uint) = cast::transmute(t); + let obj: ~Runtime = cast::transmute((vtable, obj)); + self.put_runtime(obj); + None + } } } } - pub fn homed(&self) -> bool { - match self.task_type { - GreenTask(Some(AnySched)) => { false } - GreenTask(Some(Sched(SchedHandle { .. }))) => { true } - GreenTask(None) => { - rtabort!("task without home"); - } - SchedTask => { - rtabort!("type error: expected: GreenTask, found: SchedTask"); - } - } + /// Spawns a sibling to this task. The newly spawned task is configured with + /// the `opts` structure and will run `f` as the body of its code. + pub fn spawn_sibling(mut ~self, opts: TaskOpts, f: proc()) { + let ops = self.imp.take_unwrap(); + ops.spawn_sibling(self, opts, f) } - // Grab both the scheduler and the task from TLS and check if the - // task is executing on an appropriate scheduler. - pub fn on_appropriate_sched() -> bool { - let mut task = Local::borrow(None::<Task>); - let sched_id = task.get().sched.get_ref().sched_id(); - let sched_run_anything = task.get().sched.get_ref().run_anything; - match task.get().task_type { - GreenTask(Some(AnySched)) => { - rtdebug!("anysched task in sched check ****"); - sched_run_anything - } - GreenTask(Some(Sched(SchedHandle { sched_id: ref id, ..}))) => { - rtdebug!("homed task in sched check ****"); - *id == sched_id - } - GreenTask(None) => { - rtabort!("task without home"); - } - SchedTask => { - rtabort!("type error: expected: GreenTask, found: SchedTask"); - } + /// Deschedules the current task, invoking `f` `amt` times. It is not + /// recommended to use this function directly, but rather communication + /// primitives in `std::comm` should be used. + pub fn deschedule(mut ~self, amt: uint, + f: |BlockedTask| -> Result<(), BlockedTask>) { + let ops = self.imp.take_unwrap(); + ops.deschedule(amt, self, f) + } + + /// Wakes up a previously blocked task, optionally specifiying whether the + /// current task can accept a change in scheduling. This function can only + /// be called on tasks that were previously blocked in `deschedule`. + pub fn reawaken(mut ~self, can_resched: bool) { + let ops = self.imp.take_unwrap(); + ops.reawaken(self, can_resched); + } + + /// Yields control of this task to another task. This function will + /// eventually return, but possibly not immediately. This is used as an + /// opportunity to allow other tasks a chance to run. + pub fn yield_now(mut ~self) { + let ops = self.imp.take_unwrap(); + ops.yield_now(self); + } + + /// Similar to `yield_now`, except that this function may immediately return + /// without yielding (depending on what the runtime decides to do). + pub fn maybe_yield(mut ~self) { + let ops = self.imp.take_unwrap(); + ops.maybe_yield(self); + } + + /// Acquires a handle to the I/O factory that this task contains, normally + /// stored in the task's runtime. This factory may not always be available, + /// which is why the return type is `Option` + pub fn local_io<'a>(&'a mut self) -> Option<LocalIo<'a>> { + self.imp.get_mut_ref().local_io() + } + + /// The main function of all rust executables will by default use this + /// function. This function will *block* the OS thread (hence the `unsafe`) + /// waiting for all known tasks to complete. Once this function has + /// returned, it is guaranteed that no more user-defined code is still + /// running. + pub unsafe fn wait_for_other_tasks(&mut self) { + TASK_COUNT.fetch_sub(1, SeqCst); // don't count ourselves + TASK_LOCK.lock(); + while TASK_COUNT.load(SeqCst) > 0 { + TASK_LOCK.wait(); } + TASK_LOCK.unlock(); + TASK_COUNT.fetch_add(1, SeqCst); // add ourselves back in } } @@ -346,348 +314,192 @@ impl Drop for Task { fn drop(&mut self) { rtdebug!("called drop for a task: {}", borrow::to_uint(self)); rtassert!(self.destroyed); - - unsafe { self.nasty_deschedule_lock.destroy(); } } } -// Coroutines represent nothing more than a context and a stack -// segment. - -impl Coroutine { - - pub fn new(stack_pool: &mut StackPool, - stack_size: Option<uint>, - start: proc()) - -> Coroutine { - let stack_size = match stack_size { - Some(size) => size, - None => env::min_stack() - }; - let start = Coroutine::build_start_wrapper(start); - let mut stack = stack_pool.take_segment(stack_size); - let initial_context = Context::new(start, &mut stack); - Coroutine { - current_stack_segment: stack, - saved_context: initial_context - } +impl Iterator<BlockedTask> for BlockedTaskIterator { + fn next(&mut self) -> Option<BlockedTask> { + Some(Shared(self.inner.clone())) } +} - pub fn empty() -> Coroutine { - Coroutine { - current_stack_segment: StackSegment::new(0), - saved_context: Context::empty() +impl BlockedTask { + /// Returns Some if the task was successfully woken; None if already killed. + pub fn wake(self) -> Option<~Task> { + match self { + Owned(task) => Some(task), + Shared(arc) => unsafe { + match (*arc.get()).swap(0, SeqCst) { + 0 => None, + n => Some(cast::transmute(n)), + } + } } } - fn build_start_wrapper(start: proc()) -> proc() { - let wrapper: proc() = proc() { - // First code after swap to this new context. Run our - // cleanup job. - unsafe { + // This assertion has two flavours because the wake involves an atomic op. + // In the faster version, destructors will fail dramatically instead. + #[cfg(not(test))] pub fn trash(self) { } + #[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); } - // Again - might work while safe, or it might not. - { - let mut sched = Local::borrow(None::<Scheduler>); - sched.get().run_cleanup_job(); - } + /// Create a blocked task, unless the task was already killed. + pub fn block(task: ~Task) -> BlockedTask { + Owned(task) + } - // To call the run method on a task we need a direct - // reference to it. The task is in TLS, so we can - // simply unsafe_borrow it to get this reference. We - // need to still have the task in TLS though, so we - // need to unsafe_borrow. - let task: *mut Task = Local::unsafe_borrow(); - - let mut start_cell = Some(start); - (*task).run(|| { - // N.B. Removing `start` from the start wrapper - // closure by emptying a cell is critical for - // correctness. The ~Task pointer, and in turn the - // closure used to initialize the first call - // frame, is destroyed in the scheduler context, - // not task context. So any captured closures must - // not contain user-definable dtors that expect to - // be in task context. By moving `start` out of - // the closure, all the user code goes our of - // scope while the task is still running. - let start = start_cell.take_unwrap(); - start(); - }); + /// Converts one blocked task handle to a list of many handles to the same. + pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTaskIterator> + { + let arc = match self { + Owned(task) => { + let flag = unsafe { AtomicUint::new(cast::transmute(task)) }; + UnsafeArc::new(flag) } - - // We remove the sched from the Task in TLS right now. - let sched: ~Scheduler = Local::take(); - // ... allowing us to give it away when performing a - // scheduling operation. - sched.terminate_current_task() + Shared(arc) => arc.clone(), }; - return wrapper; + BlockedTaskIterator{ inner: arc }.take(num_handles) } - /// Destroy coroutine and try to reuse stack segment. - pub fn recycle(self, stack_pool: &mut StackPool) { + /// Convert to an unsafe uint value. Useful for storing in a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_to_uint(self) -> uint { match self { - Coroutine { current_stack_segment, .. } => { - stack_pool.give_segment(current_stack_segment); + Owned(task) => { + let blocked_task_ptr: uint = cast::transmute(task); + rtassert!(blocked_task_ptr & 0x1 == 0); + blocked_task_ptr + } + Shared(arc) => { + let blocked_task_ptr: uint = cast::transmute(~arc); + rtassert!(blocked_task_ptr & 0x1 == 0); + blocked_task_ptr | 0x1 } } } -} - -/// This function is invoked from rust's current __morestack function. Segmented -/// stacks are currently not enabled as segmented stacks, but rather one giant -/// stack segment. This means that whenever we run out of stack, we want to -/// truly consider it to be stack overflow rather than allocating a new stack. -#[no_mangle] // - this is called from C code -#[no_split_stack] // - it would be sad for this function to trigger __morestack -#[doc(hidden)] // - Function must be `pub` to get exported, but it's - // irrelevant for documentation purposes. -#[cfg(not(test))] // in testing, use the original libstd's version -pub extern "C" fn rust_stack_exhausted() { - use rt::context; - use rt::in_green_task_context; - use rt::task::Task; - use rt::local::Local; - use unstable::intrinsics; - - unsafe { - // We're calling this function because the stack just ran out. We need - // to call some other rust functions, but if we invoke the functions - // right now it'll just trigger this handler being called again. In - // order to alleviate this, we move the stack limit to be inside of the - // red zone that was allocated for exactly this reason. - let limit = context::get_sp_limit(); - context::record_sp_limit(limit - context::RED_ZONE / 2); - - // This probably isn't the best course of action. Ideally one would want - // to unwind the stack here instead of just aborting the entire process. - // This is a tricky problem, however. There's a few things which need to - // be considered: - // - // 1. We're here because of a stack overflow, yet unwinding will run - // destructors and hence arbitrary code. What if that code overflows - // the stack? One possibility is to use the above allocation of an - // extra 10k to hope that we don't hit the limit, and if we do then - // abort the whole program. Not the best, but kind of hard to deal - // with unless we want to switch stacks. - // - // 2. LLVM will optimize functions based on whether they can unwind or - // not. It will flag functions with 'nounwind' if it believes that - // the function cannot trigger unwinding, but if we do unwind on - // stack overflow then it means that we could unwind in any function - // anywhere. We would have to make sure that LLVM only places the - // nounwind flag on functions which don't call any other functions. - // - // 3. The function that overflowed may have owned arguments. These - // arguments need to have their destructors run, but we haven't even - // begun executing the function yet, so unwinding will not run the - // any landing pads for these functions. If this is ignored, then - // the arguments will just be leaked. - // - // Exactly what to do here is a very delicate topic, and is possibly - // still up in the air for what exactly to do. Some relevant issues: - // - // #3555 - out-of-stack failure leaks arguments - // #3695 - should there be a stack limit? - // #9855 - possible strategies which could be taken - // #9854 - unwinding on windows through __morestack has never worked - // #2361 - possible implementation of not using landing pads - - if in_green_task_context() { - let mut task = Local::borrow(None::<Task>); - let n = task.get() - .name - .as_ref() - .map(|n| n.as_slice()) - .unwrap_or("<unnamed>"); - - // See the message below for why this is not emitted to the - // task's logger. This has the additional conundrum of the - // logger may not be initialized just yet, meaning that an FFI - // call would happen to initialized it (calling out to libuv), - // and the FFI call needs 2MB of stack when we just ran out. - rterrln!("task '{}' has overflowed its stack", n); + /// Convert from an unsafe uint value. Useful for retrieving a pipe's state + /// flag. + #[inline] + pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask { + if blocked_task_ptr & 0x1 == 0 { + Owned(cast::transmute(blocked_task_ptr)) } else { - rterrln!("stack overflow in non-task context"); + let ptr: ~UnsafeArc<AtomicUint> = + cast::transmute(blocked_task_ptr & !1); + Shared(*ptr) } - - intrinsics::abort(); } } -/// This is the entry point of unwinding for things like lang items and such. -/// The arguments are normally generated by the compiler, and need to -/// have static lifetimes. -pub fn begin_unwind_raw(msg: *c_char, file: *c_char, line: size_t) -> ! { - use c_str::CString; - use cast::transmute; +impl Death { + pub fn new() -> Death { + Death { on_exit: None, } + } - #[inline] - fn static_char_ptr(p: *c_char) -> &'static str { - let s = unsafe { CString::new(p, false) }; - match s.as_str() { - Some(s) => unsafe { transmute::<&str, &'static str>(s) }, - None => rtabort!("message wasn't utf8?") + /// Collect failure exit codes from children and propagate them to a parent. + pub fn collect_failure(&mut self, result: TaskResult) { + match self.on_exit.take() { + Some(f) => f(result), + None => {} } } - - let msg = static_char_ptr(msg); - let file = static_char_ptr(file); - - begin_unwind(msg, file, line as uint) } -/// This is the entry point of unwinding for fail!() and assert!(). -pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! { - use any::AnyRefExt; - use rt::in_green_task_context; - use rt::local::Local; - use rt::task::Task; - use str::Str; - use unstable::intrinsics; - - unsafe { - let task: *mut Task; - // Note that this should be the only allocation performed in this block. - // Currently this means that fail!() on OOM will invoke this code path, - // but then again we're not really ready for failing on OOM anyway. If - // we do start doing this, then we should propagate this allocation to - // be performed in the parent of this task instead of the task that's - // failing. - let msg = ~msg as ~Any; - - { - //let msg: &Any = msg; - let msg_s = match msg.as_ref::<&'static str>() { - Some(s) => *s, - None => match msg.as_ref::<~str>() { - Some(s) => s.as_slice(), - None => "~Any", - } - }; - - if !in_green_task_context() { - rterrln!("failed in non-task context at '{}', {}:{}", - msg_s, file, line); - intrinsics::abort(); - } - - task = Local::unsafe_borrow(); - let n = (*task).name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>"); - - // XXX: this should no get forcibly printed to the console, this should - // either be sent to the parent task (ideally), or get printed to - // the task's logger. Right now the logger is actually a uvio - // instance, which uses unkillable blocks internally for various - // reasons. This will cause serious trouble if the task is failing - // due to mismanagment of its own kill flag, so calling our own - // logger in its current state is a bit of a problem. - - rterrln!("task '{}' failed at '{}', {}:{}", n, msg_s, file, line); - - if (*task).unwinder.unwinding { - rtabort!("unwinding again"); - } - } - - (*task).unwinder.begin_unwind(msg); +impl Drop for Death { + fn drop(&mut self) { + // make this type noncopyable } } #[cfg(test)] mod test { use super::*; - use rt::test::*; use prelude::*; + use task; #[test] fn local_heap() { - do run_in_newsched_task() { - let a = @5; - let b = a; - assert!(*a == 5); - assert!(*b == 5); - } + let a = @5; + let b = a; + assert!(*a == 5); + assert!(*b == 5); } #[test] fn tls() { use local_data; - do run_in_newsched_task() { - local_data_key!(key: @~str) - local_data::set(key, @~"data"); - assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data"); - local_data_key!(key2: @~str) - local_data::set(key2, @~"data"); - assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data"); - } + local_data_key!(key: @~str) + local_data::set(key, @~"data"); + assert!(*local_data::get(key, |k| k.map(|k| *k)).unwrap() == ~"data"); + local_data_key!(key2: @~str) + local_data::set(key2, @~"data"); + assert!(*local_data::get(key2, |k| k.map(|k| *k)).unwrap() == ~"data"); } #[test] fn unwind() { - do run_in_newsched_task() { - let result = spawntask_try(proc()()); - rtdebug!("trying first assert"); - assert!(result.is_ok()); - let result = spawntask_try(proc() fail!()); - rtdebug!("trying second assert"); - assert!(result.is_err()); - } + let result = task::try(proc()()); + rtdebug!("trying first assert"); + assert!(result.is_ok()); + let result = task::try::<()>(proc() fail!()); + rtdebug!("trying second assert"); + assert!(result.is_err()); } #[test] fn rng() { - do run_in_uv_task() { - use rand::{rng, Rng}; - let mut r = rng(); - let _ = r.next_u32(); - } + use rand::{rng, Rng}; + let mut r = rng(); + let _ = r.next_u32(); } #[test] fn logging() { - do run_in_uv_task() { - info!("here i am. logging in a newsched task"); - } + info!("here i am. logging in a newsched task"); } #[test] fn comm_stream() { - do run_in_newsched_task() { - let (port, chan) = Chan::new(); - chan.send(10); - assert!(port.recv() == 10); - } + let (port, chan) = Chan::new(); + chan.send(10); + assert!(port.recv() == 10); } #[test] fn comm_shared_chan() { - do run_in_newsched_task() { - let (port, chan) = SharedChan::new(); - chan.send(10); - assert!(port.recv() == 10); - } + let (port, chan) = SharedChan::new(); + chan.send(10); + assert!(port.recv() == 10); } #[test] fn heap_cycles() { use option::{Option, Some, None}; - do run_in_newsched_task { - struct List { - next: Option<@mut List>, - } + struct List { + next: Option<@mut List>, + } - let a = @mut List { next: None }; - let b = @mut List { next: Some(a) }; + let a = @mut List { next: None }; + let b = @mut List { next: Some(a) }; - a.next = Some(b); - } + a.next = Some(b); } #[test] #[should_fail] - fn test_begin_unwind() { begin_unwind("cause", file!(), line!()) } + fn test_begin_unwind() { + use rt::unwind::begin_unwind; + begin_unwind("cause", file!(), line!()) + } + + // Task blocking tests + + #[test] + fn block_and_wake() { + let task = ~Task::new(); + let mut task = BlockedTask::block(task).wake().unwrap(); + task.destroyed = true; + } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs deleted file mode 100644 index 2b48b396c99..00000000000 --- a/src/libstd/rt/test.rs +++ /dev/null @@ -1,440 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr}; - -use clone::Clone; -use container::Container; -use iter::{Iterator, range}; -use option::{Some, None}; -use os; -use path::GenericPath; -use path::Path; -use rand::Rng; -use rand; -use result::{Result, Ok, Err}; -use rt::basic; -use rt::deque::BufferPool; -use comm::Chan; -use rt::new_event_loop; -use rt::sched::Scheduler; -use rt::sleeper_list::SleeperList; -use rt::task::Task; -use rt::thread::Thread; -use task::TaskResult; -use unstable::{run_in_bare_thread}; -use vec; -use vec::{OwnedVector, MutableVector, ImmutableVector}; - -pub fn new_test_uv_sched() -> Scheduler { - - let mut pool = BufferPool::new(); - let (worker, stealer) = pool.deque(); - - let mut sched = Scheduler::new(new_event_loop(), - worker, - ~[stealer], - SleeperList::new()); - - // Don't wait for the Shutdown message - sched.no_sleep = true; - return sched; - -} - -pub fn new_test_sched() -> Scheduler { - let mut pool = BufferPool::new(); - let (worker, stealer) = pool.deque(); - - let mut sched = Scheduler::new(basic::event_loop(), - worker, - ~[stealer], - SleeperList::new()); - - // Don't wait for the Shutdown message - sched.no_sleep = true; - return sched; -} - -pub fn run_in_uv_task(f: proc()) { - do run_in_bare_thread { - run_in_uv_task_core(f); - } -} - -pub fn run_in_newsched_task(f: proc()) { - do run_in_bare_thread { - run_in_newsched_task_core(f); - } -} - -pub fn run_in_uv_task_core(f: proc()) { - - use rt::sched::Shutdown; - - let mut sched = ~new_test_uv_sched(); - let exit_handle = sched.make_handle(); - - let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { - let mut exit_handle = exit_handle; - exit_handle.send(Shutdown); - rtassert!(exit_status.is_ok()); - }; - let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); - task.death.on_exit = Some(on_exit); - - sched.bootstrap(task); -} - -pub fn run_in_newsched_task_core(f: proc()) { - use rt::sched::Shutdown; - - let mut sched = ~new_test_sched(); - let exit_handle = sched.make_handle(); - - let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { - let mut exit_handle = exit_handle; - exit_handle.send(Shutdown); - rtassert!(exit_status.is_ok()); - }; - let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); - task.death.on_exit = Some(on_exit); - - sched.bootstrap(task); -} - -#[cfg(target_os="macos")] -#[allow(non_camel_case_types)] -mod darwin_fd_limit { - /*! - * darwin_fd_limit exists to work around an issue where launchctl on Mac OS X defaults the - * rlimit maxfiles to 256/unlimited. The default soft limit of 256 ends up being far too low - * for our multithreaded scheduler testing, depending on the number of cores available. - * - * This fixes issue #7772. - */ - - use libc; - type rlim_t = libc::uint64_t; - struct rlimit { - rlim_cur: rlim_t, - rlim_max: rlim_t - } - #[nolink] - extern { - // name probably doesn't need to be mut, but the C function doesn't specify const - fn sysctl(name: *mut libc::c_int, namelen: libc::c_uint, - oldp: *mut libc::c_void, oldlenp: *mut libc::size_t, - newp: *mut libc::c_void, newlen: libc::size_t) -> libc::c_int; - fn getrlimit(resource: libc::c_int, rlp: *mut rlimit) -> libc::c_int; - fn setrlimit(resource: libc::c_int, rlp: *rlimit) -> libc::c_int; - } - static CTL_KERN: libc::c_int = 1; - static KERN_MAXFILESPERPROC: libc::c_int = 29; - static RLIMIT_NOFILE: libc::c_int = 8; - - pub unsafe fn raise_fd_limit() { - // The strategy here is to fetch the current resource limits, read the kern.maxfilesperproc - // sysctl value, and bump the soft resource limit for maxfiles up to the sysctl value. - use ptr::{to_unsafe_ptr, to_mut_unsafe_ptr, mut_null}; - use mem::size_of_val; - use os::last_os_error; - - // Fetch the kern.maxfilesperproc value - let mut mib: [libc::c_int, ..2] = [CTL_KERN, KERN_MAXFILESPERPROC]; - let mut maxfiles: libc::c_int = 0; - let mut size: libc::size_t = size_of_val(&maxfiles) as libc::size_t; - if sysctl(to_mut_unsafe_ptr(&mut mib[0]), 2, - to_mut_unsafe_ptr(&mut maxfiles) as *mut libc::c_void, - to_mut_unsafe_ptr(&mut size), - mut_null(), 0) != 0 { - let err = last_os_error(); - error!("raise_fd_limit: error calling sysctl: {}", err); - return; - } - - // Fetch the current resource limits - let mut rlim = rlimit{rlim_cur: 0, rlim_max: 0}; - if getrlimit(RLIMIT_NOFILE, to_mut_unsafe_ptr(&mut rlim)) != 0 { - let err = last_os_error(); - error!("raise_fd_limit: error calling getrlimit: {}", err); - return; - } - - // Bump the soft limit to the smaller of kern.maxfilesperproc and the hard limit - rlim.rlim_cur = ::cmp::min(maxfiles as rlim_t, rlim.rlim_max); - - // Set our newly-increased resource limit - if setrlimit(RLIMIT_NOFILE, to_unsafe_ptr(&rlim)) != 0 { - let err = last_os_error(); - error!("raise_fd_limit: error calling setrlimit: {}", err); - return; - } - } -} - -#[cfg(not(target_os="macos"))] -mod darwin_fd_limit { - pub unsafe fn raise_fd_limit() {} -} - -#[doc(hidden)] -pub fn prepare_for_lots_of_tests() { - // Bump the fd limit on OS X. See darwin_fd_limit for an explanation. - unsafe { darwin_fd_limit::raise_fd_limit() } -} - -/// Create more than one scheduler and run a function in a task -/// in one of the schedulers. The schedulers will stay alive -/// until the function `f` returns. -pub fn run_in_mt_newsched_task(f: proc()) { - use os; - use from_str::FromStr; - use rt::sched::Shutdown; - use rt::util; - - // see comment in other function (raising fd limits) - prepare_for_lots_of_tests(); - - do run_in_bare_thread { - let nthreads = match os::getenv("RUST_RT_TEST_THREADS") { - Some(nstr) => FromStr::from_str(nstr).unwrap(), - None => { - if util::limit_thread_creation_due_to_osx_and_valgrind() { - 1 - } else { - // Using more threads than cores in test code - // to force the OS to preempt them frequently. - // Assuming that this help stress test concurrent types. - util::num_cpus() * 2 - } - } - }; - - let sleepers = SleeperList::new(); - - let mut handles = ~[]; - let mut scheds = ~[]; - - let mut pool = BufferPool::<~Task>::new(); - let workers = range(0, nthreads).map(|_| pool.deque()); - let (workers, stealers) = vec::unzip(workers); - - for worker in workers.move_iter() { - let loop_ = new_event_loop(); - let mut sched = ~Scheduler::new(loop_, - worker, - stealers.clone(), - sleepers.clone()); - let handle = sched.make_handle(); - - handles.push(handle); - scheds.push(sched); - } - - let handles = handles; // Work around not being able to capture mut - let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { - // Tell schedulers to exit - let mut handles = handles; - for handle in handles.mut_iter() { - handle.send(Shutdown); - } - - rtassert!(exit_status.is_ok()); - }; - let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, - None, - f); - main_task.death.on_exit = Some(on_exit); - - let mut threads = ~[]; - - let main_thread = { - let sched = scheds.pop(); - let main_task = main_task; - do Thread::start { - sched.bootstrap(main_task); - } - }; - threads.push(main_thread); - - while !scheds.is_empty() { - let mut sched = scheds.pop(); - let bootstrap_task = ~do Task::new_root(&mut sched.stack_pool, None) || { - rtdebug!("bootstrapping non-primary scheduler"); - }; - let sched = sched; - let thread = do Thread::start { - sched.bootstrap(bootstrap_task); - }; - - threads.push(thread); - } - - // Wait for schedulers - for thread in threads.move_iter() { - thread.join(); - } - } - -} - -/// Test tasks will abort on failure instead of unwinding -pub fn spawntask(f: proc()) { - Scheduler::run_task(Task::build_child(None, f)); -} - -/// Create a new task and run it right now. Aborts on failure -pub fn spawntask_later(f: proc()) { - Scheduler::run_task_later(Task::build_child(None, f)); -} - -pub fn spawntask_random(f: proc()) { - use rand::{Rand, rng}; - - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); - - if run_now { - spawntask(f) - } else { - spawntask_later(f) - } -} - -pub fn spawntask_try(f: proc()) -> Result<(),()> { - - let (port, chan) = Chan::new(); - let on_exit: proc(TaskResult) = proc(exit_status) { - chan.send(exit_status) - }; - - let mut new_task = Task::build_root(None, f); - new_task.death.on_exit = Some(on_exit); - - Scheduler::run_task(new_task); - - let exit_status = port.recv(); - if exit_status.is_ok() { Ok(()) } else { Err(()) } - -} - -/// Spawn a new task in a new scheduler and return a thread handle. -pub fn spawntask_thread(f: proc()) -> Thread<()> { - let thread = do Thread::start { - run_in_newsched_task_core(f); - }; - - return thread; -} - -/// Get a ~Task for testing purposes other than actually scheduling it. -pub fn with_test_task(blk: proc(~Task) -> ~Task) { - do run_in_bare_thread { - let mut sched = ~new_test_sched(); - let task = blk(~Task::new_root(&mut sched.stack_pool, - None, - proc() {})); - cleanup_task(task); - } -} - -/// Use to cleanup tasks created for testing but not "run". -pub fn cleanup_task(mut task: ~Task) { - task.destroyed = true; -} - -/// Get a port number, starting at 9600, for use in tests -pub fn next_test_port() -> u16 { - use unstable::mutex::{Mutex, MUTEX_INIT}; - static mut lock: Mutex = MUTEX_INIT; - static mut next_offset: u16 = 0; - unsafe { - let base = base_port(); - lock.lock(); - let ret = base + next_offset; - next_offset += 1; - lock.unlock(); - return ret; - } -} - -/// Get a temporary path which could be the location of a unix socket -pub fn next_test_unix() -> Path { - if cfg!(unix) { - os::tmpdir().join(rand::task_rng().gen_ascii_str(20)) - } else { - Path::new(r"\\.\pipe\" + rand::task_rng().gen_ascii_str(20)) - } -} - -/// Get a unique IPv4 localhost:port pair starting at 9600 -pub fn next_test_ip4() -> SocketAddr { - SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: next_test_port() } -} - -/// Get a unique IPv6 localhost:port pair starting at 9600 -pub fn next_test_ip6() -> SocketAddr { - SocketAddr { ip: Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1), port: next_test_port() } -} - -/* -XXX: Welcome to MegaHack City. - -The bots run multiple builds at the same time, and these builds -all want to use ports. This function figures out which workspace -it is running in and assigns a port range based on it. -*/ -fn base_port() -> u16 { - use os; - use str::StrSlice; - use vec::ImmutableVector; - - let base = 9600u16; - let range = 1000u16; - - let bases = [ - ("32-opt", base + range * 1), - ("32-noopt", base + range * 2), - ("64-opt", base + range * 3), - ("64-noopt", base + range * 4), - ("64-opt-vg", base + range * 5), - ("all-opt", base + range * 6), - ("snap3", base + range * 7), - ("dist", base + range * 8) - ]; - - // FIXME (#9639): This needs to handle non-utf8 paths - let path = os::getcwd(); - let path_s = path.as_str().unwrap(); - - let mut final_base = base; - - for &(dir, base) in bases.iter() { - if path_s.contains(dir) { - final_base = base; - break; - } - } - - return final_base; -} - -/// Get a constant that represents the number of times to repeat -/// stress tests. Default 1. -pub fn stress_factor() -> uint { - use os::getenv; - use from_str::from_str; - - match getenv("RUST_RT_STRESS") { - Some(val) => from_str::<uint>(val).unwrap(), - None => 1 - } -} diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index 6128f310a2e..f4f4aaa2765 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -33,7 +33,7 @@ pub struct Thread<T> { priv packet: ~Option<T>, } -static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024; +static DEFAULT_STACK_SIZE: uint = 1024 * 1024; // This is the starting point of rust os threads. The first thing we do // is make sure that we don't trigger __morestack (also why this has a @@ -41,9 +41,9 @@ static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024; // and invoke it. #[no_split_stack] extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return { - use rt::context; + use unstable::stack; unsafe { - context::record_stack_bounds(0, uint::max_value); + stack::record_stack_bounds(0, uint::max_value); let f: ~proc() = cast::transmute(main); (*f)(); cast::transmute(0 as imp::rust_thread_return) @@ -69,6 +69,12 @@ impl Thread<()> { /// called, when the `Thread` falls out of scope its destructor will block /// waiting for the OS thread. pub fn start<T: Send>(main: proc() -> T) -> Thread<T> { + Thread::start_stack(DEFAULT_STACK_SIZE, main) + } + + /// Performs the same functionality as `start`, but specifies an explicit + /// stack size for the new thread. + pub fn start_stack<T: Send>(stack: uint, main: proc() -> T) -> Thread<T> { // We need the address of the packet to fill in to be stable so when // `main` fills it in it's still valid, so allocate an extra ~ box to do @@ -78,7 +84,7 @@ impl Thread<()> { *cast::transmute::<&~Option<T>, **mut Option<T>>(&packet) }; let main: proc() = proc() unsafe { *packet2 = Some(main()); }; - let native = unsafe { imp::create(~main) }; + let native = unsafe { imp::create(stack, ~main) }; Thread { native: native, @@ -94,8 +100,14 @@ impl Thread<()> { /// systems. Note that platforms may not keep the main program alive even if /// there are detached thread still running around. pub fn spawn(main: proc()) { + Thread::spawn_stack(DEFAULT_STACK_SIZE, main) + } + + /// Performs the same functionality as `spawn`, but explicitly specifies a + /// stack size for the new thread. + pub fn spawn_stack(stack: uint, main: proc()) { unsafe { - let handle = imp::create(~main); + let handle = imp::create(stack, ~main); imp::detach(handle); } } @@ -132,8 +144,6 @@ impl<T: Send> Drop for Thread<T> { #[cfg(windows)] mod imp { - use super::DEFAULT_STACK_SIZE; - use cast; use libc; use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, @@ -143,9 +153,9 @@ mod imp { pub type rust_thread = HANDLE; pub type rust_thread_return = DWORD; - pub unsafe fn create(p: ~proc()) -> rust_thread { + pub unsafe fn create(stack: uint, p: ~proc()) -> rust_thread { let arg: *mut libc::c_void = cast::transmute(p); - CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, super::thread_start, + CreateThread(ptr::mut_null(), stack as libc::size_t, super::thread_start, arg, 0, ptr::mut_null()) } @@ -183,17 +193,17 @@ mod imp { use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; use libc; use ptr; - use super::DEFAULT_STACK_SIZE; use unstable::intrinsics; pub type rust_thread = libc::pthread_t; pub type rust_thread_return = *libc::c_void; - pub unsafe fn create(p: ~proc()) -> rust_thread { + pub unsafe fn create(stack: uint, p: ~proc()) -> rust_thread { let mut native: libc::pthread_t = intrinsics::uninit(); let mut attr: libc::pthread_attr_t = intrinsics::uninit(); assert_eq!(pthread_attr_init(&mut attr), 0); - assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); + assert_eq!(pthread_attr_setstacksize(&mut attr, + stack as libc::size_t), 0); assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0); diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs deleted file mode 100644 index 5e867bcdfba..00000000000 --- a/src/libstd/rt/tube.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A very simple unsynchronized channel type for sending buffered data from -//! scheduler context to task context. -//! -//! XXX: This would be safer to use if split into two types like Port/Chan - -use option::*; -use clone::Clone; -use super::rc::RC; -use rt::sched::Scheduler; -use rt::kill::BlockedTask; -use rt::local::Local; -use vec::OwnedVector; -use container::Container; - -struct TubeState<T> { - blocked_task: Option<BlockedTask>, - buf: ~[T] -} - -pub struct Tube<T> { - priv p: RC<TubeState<T>> -} - -impl<T> Tube<T> { - pub fn new() -> Tube<T> { - Tube { - p: RC::new(TubeState { - blocked_task: None, - buf: ~[] - }) - } - } - - pub fn send(&mut self, val: T) { - rtdebug!("tube send"); - unsafe { - let state = self.p.unsafe_borrow_mut(); - (*state).buf.push(val); - - if (*state).blocked_task.is_some() { - // There's a waiting task. Wake it up - rtdebug!("waking blocked tube"); - let task = (*state).blocked_task.take_unwrap(); - let sched: ~Scheduler = Local::take(); - sched.resume_blocked_task_immediately(task); - } - } - } - - pub fn recv(&mut self) -> T { - unsafe { - let state = self.p.unsafe_borrow_mut(); - if !(*state).buf.is_empty() { - return (*state).buf.shift(); - } else { - // Block and wait for the next message - rtdebug!("blocking on tube recv"); - assert!(self.p.refcount() > 1); // There better be somebody to wake us up - assert!((*state).blocked_task.is_none()); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|_, task| { - (*state).blocked_task = Some(task); - }); - rtdebug!("waking after tube recv"); - let buf = &mut (*state).buf; - assert!(!buf.is_empty()); - return buf.shift(); - } - } - } -} - -impl<T> Clone for Tube<T> { - fn clone(&self) -> Tube<T> { - Tube { p: self.p.clone() } - } -} - -#[cfg(test)] -mod test { - use rt::test::*; - use rt::rtio::EventLoop; - use rt::sched::Scheduler; - use rt::local::Local; - use super::*; - use prelude::*; - - #[test] - fn simple_test() { - do run_in_newsched_task { - let mut tube: Tube<int> = Tube::new(); - let mut tube_clone = Some(tube.clone()); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - let mut tube_clone = tube_clone.take_unwrap(); - tube_clone.send(1); - sched.enqueue_blocked_task(task); - }); - - assert!(tube.recv() == 1); - } - } - - #[test] - fn blocking_test() { - do run_in_newsched_task { - let mut tube: Tube<int> = Tube::new(); - let mut tube_clone = Some(tube.clone()); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - let tube_clone = tube_clone.take_unwrap(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone; - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } - sched.enqueue_blocked_task(task); - }); - - assert!(tube.recv() == 1); - } - } - - #[test] - fn many_blocking_test() { - static MAX: int = 100; - - do run_in_newsched_task { - let mut tube: Tube<int> = Tube::new(); - let mut tube_clone = Some(tube.clone()); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - callback_send(tube_clone.take_unwrap(), 0); - - fn callback_send(tube: Tube<int>, i: int) { - if i == 100 { - return - } - - let mut sched = Local::borrow(None::<Scheduler>); - do sched.get().event_loop.callback { - let mut tube = tube; - // The task should be blocked on this now and - // sending will wake it up. - tube.send(i); - callback_send(tube, i + 1); - } - } - - sched.enqueue_blocked_task(task); - }); - - for i in range(0, MAX) { - let j = tube.recv(); - assert!(j == i); - } - } - } -} diff --git a/src/libstd/rt/unwind.rs b/src/libstd/rt/unwind.rs index 3f6f54a9c0e..9706dbae4c6 100644 --- a/src/libstd/rt/unwind.rs +++ b/src/libstd/rt/unwind.rs @@ -8,11 +8,11 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. - // Implementation of Rust stack unwinding // -// For background on exception handling and stack unwinding please see "Exception Handling in LLVM" -// (llvm.org/docs/ExceptionHandling.html) and documents linked from it. +// For background on exception handling and stack unwinding please see +// "Exception Handling in LLVM" (llvm.org/docs/ExceptionHandling.html) and +// documents linked from it. // These are also good reads: // http://theofilos.cs.columbia.edu/blog/2013/09/22/base_abi/ // http://monoinfinito.wordpress.com/series/exception-handling-in-c/ @@ -21,41 +21,55 @@ // ~~~ A brief summary ~~~ // Exception handling happens in two phases: a search phase and a cleanup phase. // -// In both phases the unwinder walks stack frames from top to bottom using information from -// the stack frame unwind sections of the current process's modules ("module" here refers to -// an OS module, i.e. an executable or a dynamic library). +// In both phases the unwinder walks stack frames from top to bottom using +// information from the stack frame unwind sections of the current process's +// modules ("module" here refers to an OS module, i.e. an executable or a +// dynamic library). // -// For each stack frame, it invokes the associated "personality routine", whose address is also -// stored in the unwind info section. +// For each stack frame, it invokes the associated "personality routine", whose +// address is also stored in the unwind info section. // -// In the search phase, the job of a personality routine is to examine exception object being -// thrown, and to decide whether it should be caught at that stack frame. Once the handler frame -// has been identified, cleanup phase begins. +// In the search phase, the job of a personality routine is to examine exception +// object being thrown, and to decide whether it should be caught at that stack +// frame. Once the handler frame has been identified, cleanup phase begins. // -// In the cleanup phase, personality routines invoke cleanup code associated with their -// stack frames (i.e. destructors). Once stack has been unwound down to the handler frame level, -// unwinding stops and the last personality routine transfers control to its' catch block. +// In the cleanup phase, personality routines invoke cleanup code associated +// with their stack frames (i.e. destructors). Once stack has been unwound down +// to the handler frame level, unwinding stops and the last personality routine +// transfers control to its' catch block. // // ~~~ Frame unwind info registration ~~~ -// Each module has its' own frame unwind info section (usually ".eh_frame"), and unwinder needs -// to know about all of them in order for unwinding to be able to cross module boundaries. +// Each module has its' own frame unwind info section (usually ".eh_frame"), and +// unwinder needs to know about all of them in order for unwinding to be able to +// cross module boundaries. // -// On some platforms, like Linux, this is achieved by dynamically enumerating currently loaded -// modules via the dl_iterate_phdr() API and finding all .eh_frame sections. +// On some platforms, like Linux, this is achieved by dynamically enumerating +// currently loaded modules via the dl_iterate_phdr() API and finding all +// .eh_frame sections. // -// Others, like Windows, require modules to actively register their unwind info sections by calling -// __register_frame_info() API at startup. -// In the latter case it is essential that there is only one copy of the unwinder runtime -// in the process. This is usually achieved by linking to the dynamic version of the unwind -// runtime. +// Others, like Windows, require modules to actively register their unwind info +// sections by calling __register_frame_info() API at startup. In the latter +// case it is essential that there is only one copy of the unwinder runtime in +// the process. This is usually achieved by linking to the dynamic version of +// the unwind runtime. // // Currently Rust uses unwind runtime provided by libgcc. -use prelude::*; -use cast::transmute; -use task::TaskResult; +use any::{Any, AnyRefExt}; +use c_str::CString; +use cast; +use kinds::Send; +use libc::{c_char, size_t}; use libc::{c_void, c_int}; -use self::libunwind::*; +use option::{Some, None, Option}; +use result::{Err, Ok}; +use rt::local::Local; +use rt::task::Task; +use str::Str; +use task::TaskResult; +use unstable::intrinsics; + +use uw = self::libunwind; mod libunwind { //! Unwind library interface @@ -110,34 +124,41 @@ mod libunwind { } pub struct Unwinder { - unwinding: bool, - cause: Option<~Any> + priv unwinding: bool, + priv cause: Option<~Any> } impl Unwinder { + pub fn new() -> Unwinder { + Unwinder { + unwinding: false, + cause: None, + } + } + + pub fn unwinding(&self) -> bool { + self.unwinding + } pub fn try(&mut self, f: ||) { use unstable::raw::Closure; unsafe { - let closure: Closure = transmute(f); - let code = transmute(closure.code); - let env = transmute(closure.env); - - let ep = rust_try(try_fn, code, env); + let closure: Closure = cast::transmute(f); + let ep = rust_try(try_fn, closure.code as *c_void, + closure.env as *c_void); if !ep.is_null() { rtdebug!("Caught {}", (*ep).exception_class); - _Unwind_DeleteException(ep); + uw::_Unwind_DeleteException(ep); } } extern fn try_fn(code: *c_void, env: *c_void) { unsafe { - let closure: Closure = Closure { - code: transmute(code), - env: transmute(env), - }; - let closure: || = transmute(closure); + let closure: || = cast::transmute(Closure { + code: code as *(), + env: env as *(), + }); closure(); } } @@ -145,10 +166,11 @@ impl Unwinder { extern { // Rust's try-catch // When f(...) returns normally, the return value is null. - // When f(...) throws, the return value is a pointer to the caught exception object. + // When f(...) throws, the return value is a pointer to the caught + // exception object. fn rust_try(f: extern "C" fn(*c_void, *c_void), code: *c_void, - data: *c_void) -> *_Unwind_Exception; + data: *c_void) -> *uw::_Unwind_Exception; } } @@ -159,21 +181,21 @@ impl Unwinder { self.cause = Some(cause); unsafe { - let exception = ~_Unwind_Exception { + let exception = ~uw::_Unwind_Exception { exception_class: rust_exception_class(), exception_cleanup: exception_cleanup, private_1: 0, private_2: 0 }; - let error = _Unwind_RaiseException(transmute(exception)); + let error = uw::_Unwind_RaiseException(cast::transmute(exception)); rtabort!("Could not unwind stack, error = {}", error as int) } - extern "C" fn exception_cleanup(_unwind_code: _Unwind_Reason_Code, - exception: *_Unwind_Exception) { + extern "C" fn exception_cleanup(_unwind_code: uw::_Unwind_Reason_Code, + exception: *uw::_Unwind_Exception) { rtdebug!("exception_cleanup()"); unsafe { - let _: ~_Unwind_Exception = transmute(exception); + let _: ~uw::_Unwind_Exception = cast::transmute(exception); } } } @@ -189,68 +211,146 @@ impl Unwinder { // Rust's exception class identifier. This is used by personality routines to // determine whether the exception was thrown by their own runtime. -fn rust_exception_class() -> _Unwind_Exception_Class { - let bytes = bytes!("MOZ\0RUST"); // vendor, language - unsafe { - let ptr: *_Unwind_Exception_Class = transmute(bytes.as_ptr()); - *ptr - } +fn rust_exception_class() -> uw::_Unwind_Exception_Class { + // M O Z \0 R U S T -- vendor, language + 0x4d4f5a_00_52555354 } - -// We could implement our personality routine in pure Rust, however exception info decoding -// is tedious. More importantly, personality routines have to handle various platform -// quirks, which are not fun to maintain. For this reason, we attempt to reuse personality -// routine of the C language: __gcc_personality_v0. +// We could implement our personality routine in pure Rust, however exception +// info decoding is tedious. More importantly, personality routines have to +// handle various platform quirks, which are not fun to maintain. For this +// reason, we attempt to reuse personality routine of the C language: +// __gcc_personality_v0. // -// Since C does not support exception catching, __gcc_personality_v0 simply always -// returns _URC_CONTINUE_UNWIND in search phase, and always returns _URC_INSTALL_CONTEXT -// (i.e. "invoke cleanup code") in cleanup phase. +// Since C does not support exception catching, __gcc_personality_v0 simply +// always returns _URC_CONTINUE_UNWIND in search phase, and always returns +// _URC_INSTALL_CONTEXT (i.e. "invoke cleanup code") in cleanup phase. // -// This is pretty close to Rust's exception handling approach, except that Rust does have -// a single "catch-all" handler at the bottom of each task's stack. +// This is pretty close to Rust's exception handling approach, except that Rust +// does have a single "catch-all" handler at the bottom of each task's stack. // So we have two versions: -// - rust_eh_personality, used by all cleanup landing pads, which never catches, so -// the behavior of __gcc_personality_v0 is perfectly adequate there, and -// - rust_eh_personality_catch, used only by rust_try(), which always catches. This is -// achieved by overriding the return value in search phase to always say "catch!". +// - rust_eh_personality, used by all cleanup landing pads, which never catches, +// so the behavior of __gcc_personality_v0 is perfectly adequate there, and +// - rust_eh_personality_catch, used only by rust_try(), which always catches. +// This is achieved by overriding the return value in search phase to always +// say "catch!". extern "C" { fn __gcc_personality_v0(version: c_int, - actions: _Unwind_Action, - exception_class: _Unwind_Exception_Class, - ue_header: *_Unwind_Exception, - context: *_Unwind_Context) -> _Unwind_Reason_Code; + actions: uw::_Unwind_Action, + exception_class: uw::_Unwind_Exception_Class, + ue_header: *uw::_Unwind_Exception, + context: *uw::_Unwind_Context) + -> uw::_Unwind_Reason_Code; } #[lang="eh_personality"] #[no_mangle] // so we can reference it by name from middle/trans/base.rs #[doc(hidden)] #[cfg(not(test))] -pub extern "C" fn rust_eh_personality(version: c_int, - actions: _Unwind_Action, - exception_class: _Unwind_Exception_Class, - ue_header: *_Unwind_Exception, - context: *_Unwind_Context) -> _Unwind_Reason_Code { +pub extern "C" fn rust_eh_personality( + version: c_int, + actions: uw::_Unwind_Action, + exception_class: uw::_Unwind_Exception_Class, + ue_header: *uw::_Unwind_Exception, + context: *uw::_Unwind_Context +) -> uw::_Unwind_Reason_Code +{ unsafe { - __gcc_personality_v0(version, actions, exception_class, ue_header, context) + __gcc_personality_v0(version, actions, exception_class, ue_header, + context) } } #[no_mangle] // referenced from rust_try.ll #[doc(hidden)] #[cfg(not(test))] -pub extern "C" fn rust_eh_personality_catch(version: c_int, - actions: _Unwind_Action, - exception_class: _Unwind_Exception_Class, - ue_header: *_Unwind_Exception, - context: *_Unwind_Context) -> _Unwind_Reason_Code { - if (actions as c_int & _UA_SEARCH_PHASE as c_int) != 0 { // search phase - _URC_HANDLER_FOUND // catch! +pub extern "C" fn rust_eh_personality_catch( + version: c_int, + actions: uw::_Unwind_Action, + exception_class: uw::_Unwind_Exception_Class, + ue_header: *uw::_Unwind_Exception, + context: *uw::_Unwind_Context +) -> uw::_Unwind_Reason_Code +{ + if (actions as c_int & uw::_UA_SEARCH_PHASE as c_int) != 0 { // search phase + uw::_URC_HANDLER_FOUND // catch! } else { // cleanup phase unsafe { - __gcc_personality_v0(version, actions, exception_class, ue_header, context) + __gcc_personality_v0(version, actions, exception_class, ue_header, + context) } } } + +/// This is the entry point of unwinding for things like lang items and such. +/// The arguments are normally generated by the compiler, and need to +/// have static lifetimes. +pub fn begin_unwind_raw(msg: *c_char, file: *c_char, line: size_t) -> ! { + #[inline] + fn static_char_ptr(p: *c_char) -> &'static str { + let s = unsafe { CString::new(p, false) }; + match s.as_str() { + Some(s) => unsafe { cast::transmute::<&str, &'static str>(s) }, + None => rtabort!("message wasn't utf8?") + } + } + + let msg = static_char_ptr(msg); + let file = static_char_ptr(file); + + begin_unwind(msg, file, line as uint) +} + +/// This is the entry point of unwinding for fail!() and assert!(). +pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! { + unsafe { + let task: *mut Task; + // Note that this should be the only allocation performed in this block. + // Currently this means that fail!() on OOM will invoke this code path, + // but then again we're not really ready for failing on OOM anyway. If + // we do start doing this, then we should propagate this allocation to + // be performed in the parent of this task instead of the task that's + // failing. + let msg = ~msg as ~Any; + + { + let msg_s = match msg.as_ref::<&'static str>() { + Some(s) => *s, + None => match msg.as_ref::<~str>() { + Some(s) => s.as_slice(), + None => "~Any", + } + }; + + // It is assumed that all reasonable rust code will have a local + // task at all times. This means that this `try_unsafe_borrow` will + // succeed almost all of the time. There are border cases, however, + // when the runtime has *almost* set up the local task, but hasn't + // quite gotten there yet. In order to get some better diagnostics, + // we print on failure and immediately abort the whole process if + // there is no local task available. + match Local::try_unsafe_borrow() { + Some(t) => { + task = t; + let n = (*task).name.as_ref() + .map(|n| n.as_slice()).unwrap_or("<unnamed>"); + + rterrln!("task '{}' failed at '{}', {}:{}", n, msg_s, + file, line); + } + None => { + rterrln!("failed at '{}', {}:{}", msg_s, file, line); + intrinsics::abort(); + } + } + + if (*task).unwinder.unwinding { + rtabort!("unwinding again"); + } + } + + (*task).unwinder.begin_unwind(msg); + } +} diff --git a/src/libstd/rt/util.rs b/src/libstd/rt/util.rs index 93721986f3c..730a38ce886 100644 --- a/src/libstd/rt/util.rs +++ b/src/libstd/rt/util.rs @@ -15,7 +15,6 @@ use libc; use option::{Some, None, Option}; use os; use str::StrSlice; -use unstable::atomics::{AtomicInt, INIT_ATOMIC_INT, SeqCst}; use unstable::running_on_valgrind; // Indicates whether we should perform expensive sanity checks, including rtassert! @@ -68,11 +67,21 @@ pub fn default_sched_threads() -> uint { } pub fn dumb_println(args: &fmt::Arguments) { - use io::native::file::FileDesc; use io; use libc; - let mut out = FileDesc::new(libc::STDERR_FILENO, false); - fmt::writeln(&mut out as &mut io::Writer, args); + + struct Stderr; + impl io::Writer for Stderr { + fn write(&mut self, data: &[u8]) { + unsafe { + libc::write(libc::STDERR_FILENO, + data.as_ptr() as *libc::c_void, + data.len() as libc::size_t); + } + } + } + let mut w = Stderr; + fmt::writeln(&mut w as &mut io::Writer, args); } pub fn abort(msg: &str) -> ! { @@ -133,13 +142,3 @@ memory and partly incapable of presentation to others.", unsafe { libc::abort() } } } - -static mut EXIT_STATUS: AtomicInt = INIT_ATOMIC_INT; - -pub fn set_exit_status(code: int) { - unsafe { EXIT_STATUS.store(code, SeqCst) } -} - -pub fn get_exit_status() -> int { - unsafe { EXIT_STATUS.load(SeqCst) } -} |
