diff options
| -rw-r--r-- | src/libstd/macros.rs | 19 | ||||
| -rw-r--r-- | src/libstd/rt/comm.rs | 341 | ||||
| -rw-r--r-- | src/libstd/rt/io/extensions.rs | 5 | ||||
| -rw-r--r-- | src/libstd/rt/join_latch.rs | 645 | ||||
| -rw-r--r-- | src/libstd/rt/local.rs | 45 | ||||
| -rw-r--r-- | src/libstd/rt/message_queue.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/metrics.rs | 98 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 39 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 11 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 1058 | ||||
| -rw-r--r-- | src/libstd/rt/sleeper_list.rs | 59 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 60 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 359 | ||||
| -rw-r--r-- | src/libstd/rt/tube.rs | 36 | ||||
| -rw-r--r-- | src/libstd/rt/uv/async.rs | 105 | ||||
| -rw-r--r-- | src/libstd/rt/uv/idle.rs | 62 | ||||
| -rw-r--r-- | src/libstd/rt/uv/mod.rs | 63 | ||||
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 120 | ||||
| -rw-r--r-- | src/libstd/rt/uvll.rs | 443 | ||||
| -rw-r--r-- | src/libstd/sys.rs | 6 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 15 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 10 | ||||
| -rw-r--r-- | src/libstd/unstable/lang.rs | 4 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 69 | ||||
| -rw-r--r-- | src/rt/rust_builtin.cpp | 7 | ||||
| -rw-r--r-- | src/rt/rust_env.cpp | 6 | ||||
| -rw-r--r-- | src/rt/rustrt.def.in | 1 |
27 files changed, 2892 insertions, 797 deletions
diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index fda48b6ffb7..8d221fe6a1b 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -38,16 +38,19 @@ macro_rules! rtassert ( } ) ) + +// The do_abort function was originally inside the abort macro, but +// this was ICEing the compiler so it has been moved outside. Now this +// seems to work? +#[allow(missing_doc)] +pub fn do_abort() -> ! { + unsafe { ::libc::abort(); } +} + macro_rules! abort( ($( $msg:expr),+) => ( { rtdebug!($($msg),+); - - do_abort(); - - // NB: This is in a fn to avoid putting the `unsafe` block in a macro, - // which causes spurious 'unnecessary unsafe block' warnings. - fn do_abort() -> ! { - unsafe { ::libc::abort(); } - } + ::macros::do_abort(); } ) ) + diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 5d85e292861..82e6d44fe62 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,10 +22,12 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use unstable::intrinsics::{atomic_xchg, atomic_load}; +use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; +use clone::Clone; /// A combined refcount / ~Task pointer. /// @@ -34,14 +36,14 @@ use cell::Cell; /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context /// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task -type State = int; +type State = uint; static STATE_BOTH: State = 2; static STATE_ONE: State = 1; /// The heap-allocated structure shared between two endpoints. struct Packet<T> { - state: State, + state: AtomicUint, payload: Option<T>, } @@ -70,7 +72,7 @@ pub struct PortOneHack<T> { pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { let packet: ~Packet<T> = ~Packet { - state: STATE_BOTH, + state: AtomicUint::new(STATE_BOTH), payload: None }; @@ -114,12 +116,20 @@ impl<T> ChanOne<T> { // reordering of the payload write. This also issues an // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. - let oldstate = atomic_xchg(&mut (*packet).state, STATE_ONE); + let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do + do Local::borrow::<Scheduler, ()> |sched| { + rtdebug!("non-rendezvous send"); + sched.metrics.non_rendezvous_sends += 1; + } } STATE_ONE => { + do Local::borrow::<Scheduler, ()> |sched| { + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; + } // Port has closed. Need to clean up. let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet); recvr_active = false; @@ -127,7 +137,9 @@ impl<T> ChanOne<T> { task_as_state => { // Port is blocked. Wake it up. let recvr: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::<Scheduler>(); + let mut sched = Local::take::<Scheduler>(); + rtdebug!("rendezvous send"); + sched.metrics.rendezvous_sends += 1; sched.schedule_task(recvr); } } @@ -158,23 +170,30 @@ impl<T> PortOne<T> { // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read // of the payload. Also issues a release barrier to prevent reordering // of any previous writes to the task structure. let task_as_state: State = cast::transmute(task); - let oldstate = atomic_xchg(&mut (*packet).state, task_as_state); + let oldstate = (*packet).state.swap(task_as_state, SeqCst); match oldstate { STATE_BOTH => { // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; } STATE_ONE => { + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; + // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. let task: ~Coroutine = cast::transmute(task_as_state); - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } _ => util::unreachable() } @@ -210,7 +229,7 @@ impl<T> Peekable<T> for PortOne<T> { fn peek(&self) -> bool { unsafe { let packet: *mut Packet<T> = self.inner.packet(); - let oldstate = atomic_load(&mut (*packet).state); + let oldstate = (*packet).state.load(SeqCst); match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), @@ -227,7 +246,7 @@ impl<T> Drop for ChanOneHack<T> { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port still active. It will destroy the Packet. @@ -254,7 +273,7 @@ impl<T> Drop for PortOneHack<T> { unsafe { let this = cast::transmute_mut(self); - let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE); + let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Chan still active. It will destroy the packet. @@ -295,16 +314,19 @@ struct StreamPayload<T> { next: PortOne<StreamPayload<T>> } +type StreamChanOne<T> = ChanOne<StreamPayload<T>>; +type StreamPortOne<T> = PortOne<StreamPayload<T>>; + /// A channel with unbounded size. pub struct Chan<T> { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell<ChanOne<StreamPayload<T>>> + next: Cell<StreamChanOne<T>> } /// An port with unbounded size. pub struct Port<T> { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell<PortOne<StreamPayload<T>>> + next: Cell<StreamPortOne<T>> } pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) { @@ -357,6 +379,148 @@ impl<T> Peekable<T> for Port<T> { } } +pub struct SharedChan<T> { + // Just like Chan, but a shared AtomicOption instead of Cell + priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>> +} + +impl<T> SharedChan<T> { + pub fn new(chan: Chan<T>) -> SharedChan<T> { + let next = chan.next.take(); + let next = AtomicOption::new(~next); + SharedChan { next: UnsafeAtomicRcBox::new(next) } + } +} + +impl<T: Owned> GenericChan<T> for SharedChan<T> { + fn send(&self, val: T) { + self.try_send(val); + } +} + +impl<T: Owned> GenericSmartChan<T> for SharedChan<T> { + #[cfg(stage0)] // odd type checking errors + fn try_send(&self, _val: T) -> bool { + fail!() + } + + #[cfg(not(stage0))] + fn try_send(&self, val: T) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) + } + } +} + +impl<T> Clone for SharedChan<T> { + fn clone(&self) -> SharedChan<T> { + SharedChan { + next: self.next.clone() + } + } +} + +pub struct SharedPort<T> { + // The next port on which we will receive the next port on which we will receive T + priv next_link: UnsafeAtomicRcBox<AtomicOption<PortOne<StreamPortOne<T>>>> +} + +impl<T> SharedPort<T> { + pub fn new(port: Port<T>) -> SharedPort<T> { + // Put the data port into a new link pipe + let next_data_port = port.next.take(); + let (next_link_port, next_link_chan) = oneshot(); + next_link_chan.send(next_data_port); + let next_link = AtomicOption::new(~next_link_port); + SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) } + } +} + +impl<T: Owned> GenericPort<T> for SharedPort<T> { + fn recv(&self) -> T { + match self.try_recv() { + Some(val) => val, + None => { + fail!("receiving on a closed channel"); + } + } + } + + #[cfg(stage0)] // odd type checking errors + fn try_recv(&self) -> Option<T> { + fail!() + } + + #[cfg(not(stage0))] + fn try_recv(&self) -> Option<T> { + unsafe { + let (next_link_port, next_link_chan) = oneshot(); + let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); + let link_port = link_port.unwrap(); + let data_port = link_port.recv(); + let (next_data_port, res) = match data_port.try_recv() { + Some(StreamPayload { val, next }) => { + (next, Some(val)) + } + None => { + let (next_data_port, _) = oneshot(); + (next_data_port, None) + } + }; + next_link_chan.send(next_data_port); + return res; + } + } +} + +impl<T> Clone for SharedPort<T> { + fn clone(&self) -> SharedPort<T> { + SharedPort { + next_link: self.next_link.clone() + } + } +} + +// XXX: Need better name +type MegaPipe<T> = (SharedPort<T>, SharedChan<T>); + +pub fn megapipe<T: Owned>() -> MegaPipe<T> { + let (port, chan) = stream(); + (SharedPort::new(port), SharedChan::new(chan)) +} + +impl<T: Owned> GenericChan<T> for MegaPipe<T> { + fn send(&self, val: T) { + match *self { + (_, ref c) => c.send(val) + } + } +} + +impl<T: Owned> GenericSmartChan<T> for MegaPipe<T> { + fn try_send(&self, val: T) -> bool { + match *self { + (_, ref c) => c.try_send(val) + } + } +} + +impl<T: Owned> GenericPort<T> for MegaPipe<T> { + fn recv(&self) -> T { + match *self { + (ref p, _) => p.recv() + } + } + + fn try_recv(&self) -> Option<T> { + match *self { + (ref p, _) => p.try_recv() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -584,7 +748,7 @@ mod test { #[test] fn stream_send_recv_stress() { for stress_factor().times { - do run_in_newsched_task { + do run_in_mt_newsched_task { let (port, chan) = stream::<~int>(); send(chan, 0); @@ -594,18 +758,18 @@ mod test { if i == 10 { return } let chan_cell = Cell::new(chan); - let _thread = do spawntask_thread { + do spawntask_random { let chan = chan_cell.take(); chan.send(~i); send(chan, i + 1); - }; + } } fn recv(port: Port<~int>, i: int) { if i == 10 { return } let port_cell = Cell::new(port); - let _thread = do spawntask_thread { + do spawntask_random { let port = port_cell.take(); assert!(port.recv() == ~i); recv(port, i + 1); @@ -614,4 +778,143 @@ mod test { } } } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + do run_in_newsched_task { + let (port, chan) = stream(); + for 10000.times { chan.send(()) } + for 10000.times { port.recv() } + } + } + + #[test] + fn shared_chan_stress() { + do run_in_mt_newsched_task { + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let total = stress_factor() + 100; + for total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + + for total.times { + port.recv(); + } + } + } + + #[test] + fn shared_port_stress() { + do run_in_mt_newsched_task { + // XXX: Removing these type annotations causes an ICE + let (end_port, end_chan) = stream::<()>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let total = stress_factor() + 100; + for total.times { + let end_chan_clone = end_chan.clone(); + let port_clone = port.clone(); + do spawntask_random { + port_clone.recv(); + end_chan_clone.send(()); + } + } + + for total.times { + chan.send(()); + } + + for total.times { + end_port.recv(); + } + } + } + + #[test] + fn shared_port_close_simple() { + do run_in_mt_newsched_task { + let (port, chan) = stream::<()>(); + let port = SharedPort::new(port); + { let _chan = chan; } + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn shared_port_close() { + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<bool>(); + let (port, chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let port = SharedPort::new(port); + let chan = SharedChan::new(chan); + let send_total = 10; + let recv_total = 20; + do spawntask_random { + for send_total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + } + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for recv_total.times { + let port_clone = port.clone(); + let end_chan_clone = end_chan_clone.clone(); + do spawntask_random { + let recvd = port_clone.try_recv().is_some(); + end_chan_clone.send(recvd); + } + } + } + + let mut recvd = 0; + for recv_total.times { + recvd += if end_port.recv() { 1 } else { 0 }; + } + + assert!(recvd == send_total); + } + } + + #[test] + fn megapipe_stress() { + use rand; + use rand::RngUtil; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = stream::<()>(); + let end_chan = SharedChan::new(end_chan); + let pipe = megapipe(); + let total = stress_factor() + 10; + let mut rng = rand::rng(); + for total.times { + let msgs = rng.gen_uint_range(0, 10); + let pipe_clone = pipe.clone(); + let end_chan_clone = end_chan.clone(); + do spawntask_random { + for msgs.times { + pipe_clone.send(()); + } + for msgs.times { + pipe_clone.recv(); + } + } + + end_chan_clone.send(()); + } + + for total.times { + end_port.recv(); + } + } + } } diff --git a/src/libstd/rt/io/extensions.rs b/src/libstd/rt/io/extensions.rs index 727ab13a4f6..c7c3eadbe21 100644 --- a/src/libstd/rt/io/extensions.rs +++ b/src/libstd/rt/io/extensions.rs @@ -749,8 +749,6 @@ mod test { #[should_fail] #[ignore(cfg(windows))] fn push_bytes_fail_reset_len() { - use unstable::finally::Finally; - // push_bytes unsafely sets the vector length. This is testing that // upon failure the length is reset correctly. let mut reader = MockReader::new(); @@ -772,7 +770,8 @@ mod test { reader.push_bytes(&mut *buf, 4); }).finally { // NB: Using rtassert here to trigger abort on failure since this is a should_fail test - rtassert!(*buf == ~[8, 9, 10]); + // FIXME: #7049 This fails because buf is still borrowed + //rtassert!(*buf == ~[8, 9, 10]); } } diff --git a/src/libstd/rt/join_latch.rs b/src/libstd/rt/join_latch.rs new file mode 100644 index 00000000000..ad5cf2eb378 --- /dev/null +++ b/src/libstd/rt/join_latch.rs @@ -0,0 +1,645 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! The JoinLatch is a concurrent type that establishes the task +//! tree and propagates failure. +//! +//! Each task gets a JoinLatch that is derived from the JoinLatch +//! of its parent task. Every latch must be released by either calling +//! the non-blocking `release` method or the task-blocking `wait` method. +//! Releasing a latch does not complete until all of its child latches +//! complete. +//! +//! Latches carry a `success` flag that is set to `false` during task +//! failure and is propagated both from children to parents and parents +//! to children. The status af this flag may be queried for the purposes +//! of linked failure. +//! +//! In addition to failure propagation the task tree serves to keep the +//! default task schedulers alive. The runtime only sends the shutdown +//! message to schedulers once the root task exits. +//! +//! Under this scheme tasks that terminate before their children become +//! 'zombies' since they may not exit until their children do. Zombie +//! tasks are 'tombstoned' as `Tombstone(~JoinLatch)` and the tasks +//! themselves allowed to terminate. +//! +//! XXX: Propagate flag from parents to children. +//! XXX: Tombstoning actually doesn't work. +//! XXX: This could probably be done in a way that doesn't leak tombstones +//! longer than the life of the child tasks. + +use comm::{GenericPort, Peekable, GenericSmartChan}; +use clone::Clone; +use container::Container; +use option::{Option, Some, None}; +use ops::Drop; +use rt::comm::{SharedChan, Port, stream}; +use rt::local::Local; +use rt::sched::Scheduler; +use unstable::atomics::{AtomicUint, SeqCst}; +use util; +use vec::OwnedVector; + +// FIXME #7026: Would prefer this to be an enum +pub struct JoinLatch { + priv parent: Option<ParentLink>, + priv child: Option<ChildLink>, + closed: bool, +} + +// Shared between parents and all their children. +struct SharedState { + /// Reference count, held by a parent and all children. + count: AtomicUint, + success: bool +} + +struct ParentLink { + shared: *mut SharedState, + // For communicating with the parent. + chan: SharedChan<Message> +} + +struct ChildLink { + shared: ~SharedState, + // For receiving from children. + port: Port<Message>, + chan: SharedChan<Message>, + // Prevents dropping the child SharedState reference counts multiple times. + dropped_child: bool +} + +// Messages from child latches to parent. +enum Message { + Tombstone(~JoinLatch), + ChildrenTerminated +} + +impl JoinLatch { + pub fn new_root() -> ~JoinLatch { + let this = ~JoinLatch { + parent: None, + child: None, + closed: false + }; + rtdebug!("new root latch %x", this.id()); + return this; + } + + fn id(&self) -> uint { + unsafe { ::cast::transmute(&*self) } + } + + pub fn new_child(&mut self) -> ~JoinLatch { + rtassert!(!self.closed); + + if self.child.is_none() { + // This is the first time spawning a child + let shared = ~SharedState { + count: AtomicUint::new(1), + success: true + }; + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let child = ChildLink { + shared: shared, + port: port, + chan: chan, + dropped_child: false + }; + self.child = Some(child); + } + + let child_link: &mut ChildLink = self.child.get_mut_ref(); + let shared_state: *mut SharedState = &mut *child_link.shared; + + child_link.shared.count.fetch_add(1, SeqCst); + + let child = ~JoinLatch { + parent: Some(ParentLink { + shared: shared_state, + chan: child_link.chan.clone() + }), + child: None, + closed: false + }; + rtdebug!("NEW child latch %x", child.id()); + return child; + } + + pub fn release(~self, local_success: bool) { + // XXX: This should not block, but there's a bug in the below + // code that I can't figure out. + self.wait(local_success); + } + + // XXX: Should not require ~self + fn release_broken(~self, local_success: bool) { + rtassert!(!self.closed); + + rtdebug!("releasing %x", self.id()); + + let id = self.id(); + let _ = id; // XXX: `id` is only used in debug statements so appears unused + let mut this = self; + let mut child_success = true; + let mut children_done = false; + + if this.child.is_some() { + rtdebug!("releasing children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u %x", last_count, id); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + let mut tombstones = ~[]; + loop { + if child_link.port.peek() { + match child_link.port.recv() { + Tombstone(t) => { + tombstones.push(t); + }, + ChildrenTerminated => { + children_done = true; + break; + } + } + } else { + break + } + } + + rtdebug!("releasing %u tombstones %x", tombstones.len(), id); + + // Try to release the tombstones. Those that still have + // outstanding will be re-enqueued. When this task's + // parents release their latch we'll end up back here + // trying them again. + while !tombstones.is_empty() { + tombstones.pop().release(true); + } + + if children_done { + let count = shared.count.load(SeqCst); + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + } else { + children_done = true; + } + + let total_success = local_success && child_success; + + rtassert!(this.parent.is_some()); + + unsafe { + { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + } + + if children_done { + rtdebug!("children done"); + do Local::borrow::<Scheduler, ()> |sched| { + sched.metrics.release_tombstone += 1; + } + { + rtdebug!("RELEASING parent %x", id); + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u %x", last_count, id); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + this.closed = true; + util::ignore(this); + } else { + rtdebug!("children not done"); + rtdebug!("TOMBSTONING %x", id); + do Local::borrow::<Scheduler, ()> |sched| { + sched.metrics.release_no_tombstone += 1; + } + let chan = { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + parent_link.chan.clone() + }; + assert!(chan.try_send(Tombstone(this))); + } + } + } + + // XXX: Should not require ~self + pub fn wait(~self, local_success: bool) -> bool { + rtassert!(!self.closed); + + rtdebug!("WAITING %x", self.id()); + + let mut this = self; + let mut child_success = true; + + if this.child.is_some() { + rtdebug!("waiting for children"); + let child_link: &mut ChildLink = this.child.get_mut_ref(); + let shared: &mut SharedState = &mut *child_link.shared; + + if !child_link.dropped_child { + let last_count = shared.count.fetch_sub(1, SeqCst); + rtdebug!("child count before sub %u", last_count); + if last_count == 1 { + assert!(child_link.chan.try_send(ChildrenTerminated)); + } + child_link.dropped_child = true; + } + + // Wait for messages from children + loop { + match child_link.port.recv() { + Tombstone(t) => { + t.wait(true); + } + ChildrenTerminated => break + } + } + + let count = shared.count.load(SeqCst); + if count != 0 { ::io::println(fmt!("%u", count)); } + assert!(count == 0); + // self_count is the acquire-read barrier + child_success = shared.success; + } + + let total_success = local_success && child_success; + + if this.parent.is_some() { + rtdebug!("releasing parent"); + unsafe { + let parent_link: &mut ParentLink = this.parent.get_mut_ref(); + let shared: *mut SharedState = parent_link.shared; + + if !total_success { + // parent_count is the write-wait barrier + (*shared).success = false; + } + + let last_count = (*shared).count.fetch_sub(1, SeqCst); + rtdebug!("count before parent sub %u", last_count); + if last_count == 1 { + assert!(parent_link.chan.try_send(ChildrenTerminated)); + } + } + } + + this.closed = true; + util::ignore(this); + + return total_success; + } +} + +impl Drop for JoinLatch { + fn finalize(&self) { + rtdebug!("DESTROYING %x", self.id()); + rtassert!(self.closed); + } +} + +#[cfg(test)] +mod test { + use super::*; + use cell::Cell; + use container::Container; + use iter::Times; + use old_iter::BaseIter; + use rt::test::*; + use rand; + use rand::RngUtil; + use vec::{CopyableVector, ImmutableVector}; + + #[test] + fn success_immediately() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_immediately { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn success_later() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_later { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + for 10.times { + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + assert!(child_latch.wait(true)); + } + } + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + let spawn = |status| { + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_random { + let child_latch = child_latch.take(); + child_latch.wait(status); + } + }; + + for 10.times { spawn(true) } + spawn(false); + for 10.times { spawn(true) } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_success() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(true); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(latch.wait(true)); + } + } + + #[test] + fn mt_multi_level_failure() { + do run_in_mt_newsched_task { + let mut latch = JoinLatch::new_root(); + + fn child(latch: &mut JoinLatch, i: int) { + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_random { + let mut child_latch = child_latch.take(); + if i != 0 { + child(&mut *child_latch, i - 1); + child_latch.wait(false); + } else { + child_latch.wait(true); + } + } + } + + child(&mut *latch, 10); + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + + do spawntask_immediately { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_later { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_no_tombstone() { + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + + do spawntask_later { + let mut latch = child_latch.take(); + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + do spawntask_immediately { + let latch = child_latch.take(); + latch.release(false); + } + latch.release(true); + } + + assert!(!latch.wait(true)); + } + } + + #[test] + fn release_child_tombstone_stress() { + fn rand_orders() -> ~[bool] { + let mut v = ~[false,.. 5]; + v[0] = true; + let mut rng = rand::rng(); + return rng.shuffle(v); + } + + fn split_orders(orders: &[bool]) -> (~[bool], ~[bool]) { + if orders.is_empty() { + return (~[], ~[]); + } else if orders.len() <= 2 { + return (orders.to_owned(), ~[]); + } + let mut rng = rand::rng(); + let n = rng.gen_uint_range(1, orders.len()); + let first = orders.slice(0, n).to_owned(); + let last = orders.slice(n, orders.len()).to_owned(); + assert!(first.len() + last.len() == orders.len()); + return (first, last); + } + + for stress_factor().times { + do run_in_newsched_task { + fn doit(latch: &mut JoinLatch, orders: ~[bool], depth: uint) { + let (my_orders, remaining_orders) = split_orders(orders); + rtdebug!("(my_orders, remaining): %?", (&my_orders, &remaining_orders)); + rtdebug!("depth: %u", depth); + let mut remaining_orders = remaining_orders; + let mut num = 0; + for my_orders.each |&order| { + let child_latch = latch.new_child(); + let child_latch = Cell::new(child_latch); + let (child_orders, remaining) = split_orders(remaining_orders); + rtdebug!("(child_orders, remaining): %?", (&child_orders, &remaining)); + remaining_orders = remaining; + let child_orders = Cell::new(child_orders); + let child_num = num; + let _ = child_num; // XXX unused except in rtdebug! + do spawntask_random { + rtdebug!("depth %u num %u", depth, child_num); + let mut child_latch = child_latch.take(); + let child_orders = child_orders.take(); + doit(&mut *child_latch, child_orders, depth + 1); + child_latch.release(order); + } + + num += 1; + } + } + + let mut latch = JoinLatch::new_root(); + let orders = rand_orders(); + rtdebug!("orders: %?", orders); + + doit(&mut *latch, orders, 0); + + assert!(!latch.wait(true)); + } + } + } + + #[test] + fn whateverman() { + struct Order { + immediate: bool, + succeed: bool, + orders: ~[Order] + } + fn next(latch: &mut JoinLatch, orders: ~[Order]) { + for orders.each |order| { + let suborders = copy order.orders; + let child_latch = Cell::new(latch.new_child()); + let succeed = order.succeed; + if order.immediate { + do spawntask_immediately { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("immediate releasing"); + child_latch.release(succeed); + } + } else { + do spawntask_later { + let mut child_latch = child_latch.take(); + next(&mut *child_latch, copy suborders); + rtdebug!("later releasing"); + child_latch.release(succeed); + } + } + } + } + + do run_in_newsched_task { + let mut latch = JoinLatch::new_root(); + let orders = ~[ Order { // 0 0 + immediate: true, + succeed: true, + orders: ~[ Order { // 1 0 + immediate: true, + succeed: false, + orders: ~[ Order { // 2 0 + immediate: false, + succeed: false, + orders: ~[ Order { // 3 0 + immediate: true, + succeed: false, + orders: ~[] + }, Order { // 3 1 + immediate: false, + succeed: false, + orders: ~[] + }] + }] + }] + }]; + + next(&mut *latch, orders); + assert!(!latch.wait(true)); + } + } +} diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index 313123c38b5..6e0fbda5ec9 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -18,7 +18,7 @@ pub trait Local { fn put(value: ~Self); fn take() -> ~Self; fn exists() -> bool; - fn borrow(f: &fn(&mut Self)); + fn borrow<T>(f: &fn(&mut Self) -> T) -> T; unsafe fn unsafe_borrow() -> *mut Self; unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } @@ -27,7 +27,20 @@ impl Local for Scheduler { fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} fn take() -> ~Scheduler { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } } + fn borrow<T>(f: &fn(&mut Scheduler) -> T) -> T { + let mut res: Option<T> = None; + let res_ptr: *mut Option<T> = &mut res; + unsafe { + do local_ptr::borrow |sched| { + let result = f(sched); + *res_ptr = Some(result); + } + } + match res { + Some(r) => { r } + None => abort!("function failed!") + } + } unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") } } @@ -36,8 +49,8 @@ impl Local for Task { fn put(_value: ~Task) { abort!("unimpl") } fn take() -> ~Task { abort!("unimpl") } fn exists() -> bool { abort!("unimpl") } - fn borrow(f: &fn(&mut Task)) { - do Local::borrow::<Scheduler> |sched| { + fn borrow<T>(f: &fn(&mut Task) -> T) -> T { + do Local::borrow::<Scheduler, T> |sched| { match sched.current_task { Some(~ref mut task) => { f(&mut *task.task) @@ -74,7 +87,7 @@ impl Local for IoFactoryObject { fn put(_value: ~IoFactoryObject) { abort!("unimpl") } fn take() -> ~IoFactoryObject { abort!("unimpl") } fn exists() -> bool { abort!("unimpl") } - fn borrow(_f: &fn(&mut IoFactoryObject)) { abort!("unimpl") } + fn borrow<T>(_f: &fn(&mut IoFactoryObject) -> T) -> T { abort!("unimpl") } unsafe fn unsafe_borrow() -> *mut IoFactoryObject { let sched = Local::unsafe_borrow::<Scheduler>(); let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); @@ -85,34 +98,46 @@ impl Local for IoFactoryObject { #[cfg(test)] mod test { + use rt::test::*; use rt::sched::Scheduler; - use rt::uv::uvio::UvEventLoop; use super::*; #[test] fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); let _scheduler: ~Scheduler = Local::take(); } #[test] fn borrow_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); + let scheduler = ~new_test_uv_sched(); Local::put(scheduler); unsafe { let _scheduler: *mut Scheduler = Local::unsafe_borrow(); } let _scheduler: ~Scheduler = Local::take(); } + + #[test] + fn borrow_with_return() { + let scheduler = ~new_test_uv_sched(); + Local::put(scheduler); + let res = do Local::borrow::<Scheduler,bool> |_sched| { + true + }; + assert!(res) + let _scheduler: ~Scheduler = Local::take(); + } + } diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs index 5b60543344d..734be808797 100644 --- a/src/libstd/rt/message_queue.rs +++ b/src/libstd/rt/message_queue.rs @@ -8,6 +8,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! A concurrent queue that supports multiple producers and a +//! single consumer. + use container::Container; use kinds::Owned; use vec::OwnedVector; diff --git a/src/libstd/rt/metrics.rs b/src/libstd/rt/metrics.rs new file mode 100644 index 00000000000..b0c0fa5d708 --- /dev/null +++ b/src/libstd/rt/metrics.rs @@ -0,0 +1,98 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use to_str::ToStr; + +pub struct SchedMetrics { + // The number of times executing `run_sched_once`. + turns: uint, + // The number of turns that received a message. + messages_received: uint, + // The number of turns that ran a task from the queue. + tasks_resumed_from_queue: uint, + // The number of turns that found no work to perform. + wasted_turns: uint, + // The number of times the scheduler went to sleep. + sleepy_times: uint, + // Context switches from the scheduler into a task. + context_switches_sched_to_task: uint, + // Context switches from a task into the scheduler. + context_switches_task_to_sched: uint, + // Context switches from a task to a task. + context_switches_task_to_task: uint, + // Message sends that unblock the receiver + rendezvous_sends: uint, + // Message sends that do not unblock the receiver + non_rendezvous_sends: uint, + // Message receives that do not block the receiver + rendezvous_recvs: uint, + // Message receives that block the receiver + non_rendezvous_recvs: uint, + // JoinLatch releases that create tombstones + release_tombstone: uint, + // JoinLatch releases that do not create tombstones + release_no_tombstone: uint, +} + +impl SchedMetrics { + pub fn new() -> SchedMetrics { + SchedMetrics { + turns: 0, + messages_received: 0, + tasks_resumed_from_queue: 0, + wasted_turns: 0, + sleepy_times: 0, + context_switches_sched_to_task: 0, + context_switches_task_to_sched: 0, + context_switches_task_to_task: 0, + rendezvous_sends: 0, + non_rendezvous_sends: 0, + rendezvous_recvs: 0, + non_rendezvous_recvs: 0, + release_tombstone: 0, + release_no_tombstone: 0 + } + } +} + +impl ToStr for SchedMetrics { + fn to_str(&self) -> ~str { + fmt!("turns: %u\n\ + messages_received: %u\n\ + tasks_resumed_from_queue: %u\n\ + wasted_turns: %u\n\ + sleepy_times: %u\n\ + context_switches_sched_to_task: %u\n\ + context_switches_task_to_sched: %u\n\ + context_switches_task_to_task: %u\n\ + rendezvous_sends: %u\n\ + non_rendezvous_sends: %u\n\ + rendezvous_recvs: %u\n\ + non_rendezvous_recvs: %u\n\ + release_tombstone: %u\n\ + release_no_tombstone: %u\n\ + ", + self.turns, + self.messages_received, + self.tasks_resumed_from_queue, + self.wasted_turns, + self.sleepy_times, + self.context_switches_sched_to_task, + self.context_switches_task_to_sched, + self.context_switches_task_to_task, + self.rendezvous_sends, + self.non_rendezvous_sends, + self.rendezvous_recvs, + self.non_rendezvous_recvs, + self.release_tombstone, + self.release_no_tombstone + ) + } +} \ No newline at end of file diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index f9f433b9416..1724361cabc 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -55,7 +55,11 @@ Several modules in `core` are clients of `rt`: */ #[doc(hidden)]; +#[deny(unused_imports)]; +#[deny(unused_mut)]; +#[deny(unused_variable)]; +use cell::Cell; use ptr::RawPtr; /// The global (exchange) heap. @@ -88,6 +92,9 @@ mod work_queue; /// A parallel queue. mod message_queue; +/// A parallel data structure for tracking sleeping schedulers. +mod sleeper_list; + /// Stack segments and caching. mod stack; @@ -127,6 +134,11 @@ pub mod local_ptr; /// Bindings to pthread/windows thread-local storage. pub mod thread_local_storage; +/// A concurrent data structure with which parent tasks wait on child tasks. +pub mod join_latch; + +pub mod metrics; + /// Set up a default runtime configuration, given compiler-supplied arguments. /// @@ -145,13 +157,18 @@ pub mod thread_local_storage; pub fn start(_argc: int, _argv: **u8, crate_map: *u8, main: ~fn()) -> int { use self::sched::{Scheduler, Coroutine}; + use self::work_queue::WorkQueue; use self::uv::uvio::UvEventLoop; + use self::sleeper_list::SleeperList; init(crate_map); let loop_ = ~UvEventLoop::new(); - let mut sched = ~Scheduler::new(loop_); - let main_task = ~Coroutine::new(&mut sched.stack_pool, main); + let work_queue = WorkQueue::new(); + let sleepers = SleeperList::new(); + let mut sched = ~Scheduler::new(loop_, work_queue, sleepers); + sched.no_sleep = true; + let main_task = ~Coroutine::new_root(&mut sched.stack_pool, main); sched.enqueue_task(main_task); sched.run(); @@ -194,8 +211,8 @@ pub fn context() -> RuntimeContext { return OldTaskContext; } else { if Local::exists::<Scheduler>() { - let context = ::cell::Cell::new_empty(); - do Local::borrow::<Scheduler> |sched| { + let context = Cell::new_empty(); + do Local::borrow::<Scheduler, ()> |sched| { if sched.in_task_context() { context.put_back(TaskContext); } else { @@ -218,23 +235,19 @@ pub fn context() -> RuntimeContext { fn test_context() { use unstable::run_in_bare_thread; use self::sched::{Scheduler, Coroutine}; - use rt::uv::uvio::UvEventLoop; - use cell::Cell; use rt::local::Local; + use rt::test::new_test_uv_sched; assert_eq!(context(), OldTaskContext); do run_in_bare_thread { assert_eq!(context(), GlobalContext); - let mut sched = ~UvEventLoop::new_scheduler(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let mut sched = ~new_test_uv_sched(); + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { assert_eq!(context(), TaskContext); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert_eq!(context(), SchedulerContext); - let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + sched.enqueue_task(task); } }; sched.enqueue_task(task); diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 4b5eda22ff5..fa657555f3a 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -18,6 +18,7 @@ use rt::uv::uvio; // XXX: ~object doesn't work currently so these are some placeholder // types to use instead pub type EventLoopObject = uvio::UvEventLoop; +pub type RemoteCallbackObject = uvio::UvRemoteCallback; pub type IoFactoryObject = uvio::UvIoFactory; pub type RtioTcpStreamObject = uvio::UvTcpStream; pub type RtioTcpListenerObject = uvio::UvTcpListener; @@ -26,10 +27,20 @@ pub trait EventLoop { fn run(&mut self); fn callback(&mut self, ~fn()); fn callback_ms(&mut self, ms: u64, ~fn()); + fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject; /// The asynchronous I/O services. Not all event loops may provide one fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject>; } +pub trait RemoteCallback { + /// Trigger the remote callback. Note that the number of times the callback + /// is run is not guaranteed. All that is guaranteed is that, after calling 'fire', + /// the callback will be called at least once, but multiple callbacks may be coalesced + /// and callbacks may be called more often requested. Destruction also triggers the + /// callback. + fn fire(&mut self); +} + pub trait IoFactory { fn tcp_connect(&mut self, addr: IpAddr) -> Result<~RtioTcpStreamObject, IoError>; fn tcp_bind(&mut self, addr: IpAddr) -> Result<~RtioTcpListenerObject, IoError>; diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 929b44f79b5..be57247d514 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -12,21 +12,53 @@ use option::*; use sys; use cast::transmute; use cell::Cell; +use clone::Clone; +use super::sleeper_list::SleeperList; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, RemoteCallbackObject}; use super::context::Context; use super::task::Task; +use super::message_queue::MessageQueue; use rt::local_ptr; use rt::local::Local; +use rt::rtio::RemoteCallback; +use rt::metrics::SchedMetrics; + +//use to_str::ToStr; + +/// To allow for using pointers as scheduler ids +use borrow::{to_uint}; /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the /// scheduler. +/// +/// XXX: This creates too many callbacks to run_sched_once, resulting +/// in too much allocation and too many events. pub struct Scheduler { + /// A queue of available work. Under a work-stealing policy there + /// is one per Scheduler. priv work_queue: WorkQueue<~Coroutine>, + /// The queue of incoming messages from other schedulers. + /// These are enqueued by SchedHandles after which a remote callback + /// is triggered to handle the message. + priv message_queue: MessageQueue<SchedMessage>, + /// A shared list of sleeping schedulers. We'll use this to wake + /// up schedulers when pushing work onto the work queue. + priv sleeper_list: SleeperList, + /// Indicates that we have previously pushed a handle onto the + /// SleeperList but have not yet received the Wake message. + /// Being `true` does not necessarily mean that the scheduler is + /// not active since there are multiple event sources that may + /// wake the scheduler. It just prevents the scheduler from pushing + /// multiple handles onto the sleeper list. + priv sleepy: bool, + /// A flag to indicate we've received the shutdown message and should + /// no longer try to go to sleep, but exit instead. + no_sleep: bool, stack_pool: StackPool, /// The event loop used to drive the scheduler and perform I/O event_loop: ~EventLoopObject, @@ -37,19 +69,40 @@ pub struct Scheduler { current_task: Option<~Coroutine>, /// An action performed after a context switch on behalf of the /// code running before the context switch - priv cleanup_job: Option<CleanupJob> + priv cleanup_job: Option<CleanupJob>, + metrics: SchedMetrics, + /// Should this scheduler run any task, or only pinned tasks? + run_anything: bool } -// XXX: Some hacks to put a &fn in Scheduler without borrowck -// complaining -type UnsafeTaskReceiver = sys::Closure; -trait ClosureConverter { - fn from_fn(&fn(~Coroutine)) -> Self; - fn to_fn(self) -> &fn(~Coroutine); +pub struct SchedHandle { + priv remote: ~RemoteCallbackObject, + priv queue: MessageQueue<SchedMessage>, + sched_id: uint } -impl ClosureConverter for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(~Coroutine) { unsafe { transmute(self) } } + +pub struct Coroutine { + /// The segment of stack on which the task is currently running or, + /// if the task is blocked, on which the task will resume execution + priv current_stack_segment: StackSegment, + /// These are always valid when the task is not running, unless + /// the task is dead + priv saved_context: Context, + /// The heap, GC, unwinding, local storage, logging + task: ~Task, +} + +// A scheduler home is either a handle to the home scheduler, or an +// explicit "AnySched". +pub enum SchedHome { + AnySched, + Sched(SchedHandle) +} + +pub enum SchedMessage { + Wake, + Shutdown, + PinnedTask(~Coroutine) } enum CleanupJob { @@ -60,18 +113,39 @@ enum CleanupJob { impl Scheduler { pub fn in_task_context(&self) -> bool { self.current_task.is_some() } - pub fn new(event_loop: ~EventLoopObject) -> Scheduler { + pub fn sched_id(&self) -> uint { to_uint(self) } + + pub fn new(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList) + -> Scheduler { + + Scheduler::new_special(event_loop, work_queue, sleeper_list, true) + + } + + pub fn new_special(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList, + run_anything: bool) + -> Scheduler { // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); Scheduler { + sleeper_list: sleeper_list, + message_queue: MessageQueue::new(), + sleepy: false, + no_sleep: false, event_loop: event_loop, - work_queue: WorkQueue::new(), + work_queue: work_queue, stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_job: None + cleanup_job: None, + metrics: SchedMetrics::new(), + run_anything: run_anything } } @@ -84,6 +158,11 @@ impl Scheduler { let mut self_sched = self; + // Always run through the scheduler loop at least once so that + // we enter the sleep state and can then be woken up by other + // schedulers. + self_sched.event_loop.callback(Scheduler::run_sched_once); + unsafe { let event_loop: *mut ~EventLoopObject = { let event_loop: *mut ~EventLoopObject = &mut self_sched.event_loop; @@ -96,43 +175,266 @@ impl Scheduler { (*event_loop).run(); } + rtdebug!("run taking sched"); let sched = Local::take::<Scheduler>(); - assert!(sched.work_queue.is_empty()); + // XXX: Reenable this once we're using a per-task queue. With a shared + // queue this is not true + //assert!(sched.work_queue.is_empty()); + rtdebug!("scheduler metrics: %s\n", { + use to_str::ToStr; + sched.metrics.to_str() + }); return sched; } + fn run_sched_once() { + + let mut sched = Local::take::<Scheduler>(); + sched.metrics.turns += 1; + + // First, check the message queue for instructions. + // XXX: perf. Check for messages without atomics. + // It's ok if we miss messages occasionally, as long as + // we sync and check again before sleeping. + if sched.interpret_message_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + rtdebug!("run_sched_once, interpret_message_queue taking sched"); + let mut sched = Local::take::<Scheduler>(); + sched.metrics.messages_received += 1; + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // Now, look in the work queue for tasks to run + rtdebug!("run_sched_once taking"); + let sched = Local::take::<Scheduler>(); + if sched.resume_task_from_queue() { + // We performed a scheduling action. There may be other work + // to do yet, so let's try again later. + let mut sched = Local::take::<Scheduler>(); + sched.metrics.tasks_resumed_from_queue += 1; + sched.event_loop.callback(Scheduler::run_sched_once); + Local::put(sched); + return; + } + + // If we got here then there was no work to do. + // Generate a SchedHandle and push it to the sleeper list so + // somebody can wake us up later. + rtdebug!("no work to do"); + let mut sched = Local::take::<Scheduler>(); + sched.metrics.wasted_turns += 1; + if !sched.sleepy && !sched.no_sleep { + rtdebug!("sleeping"); + sched.metrics.sleepy_times += 1; + sched.sleepy = true; + let handle = sched.make_handle(); + sched.sleeper_list.push(handle); + } else { + rtdebug!("not sleeping"); + } + Local::put(sched); + } + + pub fn make_handle(&mut self) -> SchedHandle { + let remote = self.event_loop.remote_callback(Scheduler::run_sched_once); + + return SchedHandle { + remote: remote, + queue: self.message_queue.clone(), + sched_id: self.sched_id() + }; + } + /// Schedule a task to be executed later. /// - /// Pushes the task onto the work stealing queue and tells the event loop - /// to run it later. Always use this instead of pushing to the work queue - /// directly. + /// Pushes the task onto the work stealing queue and tells the + /// event loop to run it later. Always use this instead of pushing + /// to the work queue directly. pub fn enqueue_task(&mut self, task: ~Coroutine) { - self.work_queue.push(task); - self.event_loop.callback(resume_task_from_queue); - fn resume_task_from_queue() { - let scheduler = Local::take::<Scheduler>(); - scheduler.resume_task_from_queue(); - } + // We don't want to queue tasks that belong on other threads, + // so we send them home at enqueue time. + + // The borrow checker doesn't like our disassembly of the + // Coroutine struct and partial use and mutation of the + // fields. So completely disassemble here and stop using? + + // XXX perf: I think we might be able to shuffle this code to + // only destruct when we need to. + + rtdebug!("a task was queued on: %u", self.sched_id()); + + let this = self; + + // We push the task onto our local queue clone. + this.work_queue.push(task); + this.event_loop.callback(Scheduler::run_sched_once); + + // We've made work available. Notify a + // sleeping scheduler. + + // XXX: perf. Check for a sleeper without + // synchronizing memory. It's not critical + // that we always find it. + + // XXX: perf. If there's a sleeper then we + // might as well just send it the task + // directly instead of pushing it to the + // queue. That is essentially the intent here + // and it is less work. + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake) + } + None => { (/* pass */) } + }; } // * Scheduler-context operations - pub fn resume_task_from_queue(~self) { + fn interpret_message_queue(~self) -> bool { assert!(!self.in_task_context()); - rtdebug!("looking in work queue for task to schedule"); + rtdebug!("looking for scheduler messages"); let mut this = self; - match this.work_queue.pop() { - Some(task) => { - rtdebug!("resuming task from work queue"); + match this.message_queue.pop() { + Some(PinnedTask(task)) => { + rtdebug!("recv BiasedTask message in sched: %u", + this.sched_id()); + let mut task = task; + task.task.home = Some(Sched(this.make_handle())); this.resume_task_immediately(task); + return true; + } + + Some(Wake) => { + rtdebug!("recv Wake message"); + this.sleepy = false; + Local::put(this); + return true; + } + Some(Shutdown) => { + rtdebug!("recv Shutdown message"); + if this.sleepy { + // There may be an outstanding handle on the + // sleeper list. Pop them all to make sure that's + // not the case. + loop { + match this.sleeper_list.pop() { + Some(handle) => { + let mut handle = handle; + handle.send(Wake); + } + None => break + } + } + } + // No more sleeping. After there are no outstanding + // event loop references we will shut down. + this.no_sleep = true; + this.sleepy = false; + Local::put(this); + return true; } None => { - rtdebug!("no tasks in queue"); Local::put(this); + return false; + } + } + } + + /// Given an input Coroutine sends it back to its home scheduler. + fn send_task_home(task: ~Coroutine) { + let mut task = task; + let mut home = task.task.home.swap_unwrap(); + match home { + Sched(ref mut home_handle) => { + home_handle.send(PinnedTask(task)); + } + AnySched => { + abort!("error: cannot send anysched task home"); + } + } + } + + // Resume a task from the queue - but also take into account that + // it might not belong here. + fn resume_task_from_queue(~self) -> bool { + assert!(!self.in_task_context()); + + rtdebug!("looking in work queue for task to schedule"); + let mut this = self; + + // The borrow checker imposes the possibly absurd requirement + // that we split this into two match expressions. This is due + // to the inspection of the internal bits of task, as that + // can't be in scope when we act on task. + match this.work_queue.pop() { + Some(task) => { + let action_id = { + let home = &task.task.home; + match home { + &Some(Sched(ref home_handle)) + if home_handle.sched_id != this.sched_id() => { + 0 + } + &Some(AnySched) if this.run_anything => { + 1 + } + &Some(AnySched) => { + 2 + } + &Some(Sched(_)) => { + 3 + } + &None => { + 4 + } + } + }; + + match action_id { + 0 => { + rtdebug!("sending task home"); + Scheduler::send_task_home(task); + Local::put(this); + return false; + } + 1 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 2 => { + rtdebug!("re-queueing") + this.enqueue_task(task); + Local::put(this); + return false; + } + 3 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 4 => { + abort!("task home was None!"); + } + _ => { + abort!("literally, you should not be here"); + } + } } + + None => { + rtdebug!("no tasks in queue"); + Local::put(this); + return false; + } } } @@ -145,35 +447,40 @@ impl Scheduler { rtdebug!("ending running task"); - do self.deschedule_running_task_and_then |dead_task| { + do self.deschedule_running_task_and_then |sched, dead_task| { let dead_task = Cell::new(dead_task); - do Local::borrow::<Scheduler> |sched| { - dead_task.take().recycle(&mut sched.stack_pool); - } + dead_task.take().recycle(&mut sched.stack_pool); } abort!("control reached end of task"); } - pub fn schedule_new_task(~self, task: ~Coroutine) { + pub fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |last_task| { - let last_task = Cell::new(last_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(last_task.take()); - } - } - } + // is the task home? + let is_home = task.is_home_no_tls(&self); - pub fn schedule_task(~self, task: ~Coroutine) { - assert!(self.in_task_context()); + // does the task have a home? + let homed = task.homed(); - do self.switch_running_tasks_and_then(task) |last_task| { - let last_task = Cell::new(last_task); - do Local::borrow::<Scheduler> |sched| { + let mut this = self; + + if is_home || (!homed && this.run_anything) { + // here we know we are home, execute now OR we know we + // aren't homed, and that this sched doesn't care + do this.switch_running_tasks_and_then(task) |sched, last_task| { + let last_task = Cell::new(last_task); sched.enqueue_task(last_task.take()); } + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + Local::put(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + Local::put(this); } } @@ -184,6 +491,7 @@ impl Scheduler { assert!(!this.in_task_context()); rtdebug!("scheduling a task"); + this.metrics.context_switches_sched_to_task += 1; // Store the task in the scheduler so it can be grabbed later this.current_task = Some(task); @@ -217,15 +525,21 @@ impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - pub fn deschedule_running_task_and_then(~self, f: &fn(~Coroutine)) { + /// + /// This passes a Scheduler pointer to the fn after the context switch + /// in order to prevent that fn from performing further scheduling operations. + /// Doing further scheduling could easily result in infinite recursion. + pub fn deschedule_running_task_and_then(~self, f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("blocking task"); + this.metrics.context_switches_task_to_sched += 1; unsafe { let blocked_task = this.current_task.swap_unwrap(); - let f_fake_region = transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f); + let f_fake_region = transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f); let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); } @@ -247,16 +561,19 @@ impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - pub fn switch_running_tasks_and_then(~self, - next_task: ~Coroutine, - f: &fn(~Coroutine)) { + pub fn switch_running_tasks_and_then(~self, next_task: ~Coroutine, + f: &fn(&mut Scheduler, ~Coroutine)) { let mut this = self; assert!(this.in_task_context()); rtdebug!("switching tasks"); + this.metrics.context_switches_task_to_task += 1; let old_running_task = this.current_task.swap_unwrap(); - let f_fake_region = unsafe { transmute::<&fn(~Coroutine), &fn(~Coroutine)>(f) }; + let f_fake_region = unsafe { + transmute::<&fn(&mut Scheduler, ~Coroutine), + &fn(&mut Scheduler, ~Coroutine)>(f) + }; let f_opaque = ClosureConverter::from_fn(f_fake_region); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.current_task = Some(next_task); @@ -293,7 +610,7 @@ impl Scheduler { let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { DoNothing => { } - GiveTask(task, f) => (f.to_fn())(task) + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -337,40 +654,119 @@ impl Scheduler { } } -static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack - -pub struct Coroutine { - /// The segment of stack on which the task is currently running or, - /// if the task is blocked, on which the task will resume execution - priv current_stack_segment: StackSegment, - /// These are always valid when the task is not running, unless - /// the task is dead - priv saved_context: Context, - /// The heap, GC, unwinding, local storage, logging - task: ~Task +impl SchedHandle { + pub fn send(&mut self, msg: SchedMessage) { + self.queue.push(msg); + self.remote.fire(); + } } impl Coroutine { - pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { - Coroutine::with_task(stack_pool, ~Task::new(), start) + + /// This function checks that a coroutine is running "home". + pub fn is_home(&self) -> bool { + rtdebug!("checking if coroutine is home"); + do Local::borrow::<Scheduler,bool> |sched| { + match self.task.home { + Some(AnySched) => { false } + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + *id == sched.sched_id() + } + None => { abort!("error: homeless task!"); } + } + } } - pub fn with_task(stack_pool: &mut StackPool, - task: ~Task, - start: ~fn()) -> Coroutine { + /// Without access to self, but with access to the "expected home + /// id", see if we are home. + fn is_home_using_id(id: uint) -> bool { + rtdebug!("checking if coroutine is home using id"); + do Local::borrow::<Scheduler,bool> |sched| { + if sched.sched_id() == id { + true + } else { + false + } + } + } + + /// Check if this coroutine has a home + fn homed(&self) -> bool { + rtdebug!("checking if this coroutine has a home"); + match self.task.home { + Some(AnySched) => { false } + Some(Sched(_)) => { true } + None => { abort!("error: homeless task!"); + } + } + } + + /// A version of is_home that does not need to use TLS, it instead + /// takes local scheduler as a parameter. + fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + rtdebug!("checking if coroutine is home without tls"); + match self.task.home { + Some(AnySched) => { true } + Some(Sched(SchedHandle { sched_id: ref id, _})) => { + *id == sched.sched_id() + } + None => { abort!("error: homeless task!"); } + } + } + + /// Check TLS for the scheduler to see if we are on a special + /// scheduler. + pub fn on_special() -> bool { + rtdebug!("checking if coroutine is executing on special sched"); + do Local::borrow::<Scheduler,bool>() |sched| { + !sched.run_anything + } + } + + // Created new variants of "new" that takes a home scheduler + // parameter. The original with_task now calls with_task_homed + // using the AnySched paramter. + + pub fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine { + Coroutine::with_task_homed(stack_pool, ~Task::new_root(), start, home) + } + + pub fn new_root(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { + Coroutine::with_task(stack_pool, ~Task::new_root(), start) + } + + pub fn with_task_homed(stack_pool: &mut StackPool, + task: ~Task, + start: ~fn(), + home: SchedHome) -> Coroutine { + + static MIN_STACK_SIZE: uint = 1000000; // XXX: Too much stack + let start = Coroutine::build_start_wrapper(start); let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn let initial_context = Context::new(start, &mut stack); - return Coroutine { + let mut crt = Coroutine { current_stack_segment: stack, saved_context: initial_context, - task: task + task: task, }; + crt.task.home = Some(home); + return crt; + } + + pub fn with_task(stack_pool: &mut StackPool, + task: ~Task, + start: ~fn()) -> Coroutine { + Coroutine::with_task_homed(stack_pool, + task, + start, + AnySched) } fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation + let start_cell = Cell::new(start); let wrapper: ~fn() = || { // This is the first code to execute after the initial // context switch to the task. The previous context may @@ -381,8 +777,23 @@ impl Coroutine { let sched = Local::unsafe_borrow::<Scheduler>(); let task = (*sched).current_task.get_mut_ref(); - // FIXME #6141: shouldn't neet to put `start()` in another closure - task.task.run(||start()); + // FIXME #6141: shouldn't neet to put `start()` in + // another closure + let start_cell = Cell::new(start_cell.take()); + do task.task.run { + // N.B. Removing `start` from the start wrapper + // closure by emptying a cell is critical for + // correctness. The ~Task pointer, and in turn the + // closure used to initialize the first call + // frame, is destroyed in scheduler context, not + // task context. So any captured closures must + // not contain user-definable dtors that expect to + // be in task context. By moving `start` out of + // the closure, all the user code goes out of + // scope while the task is still running. + let start = start_cell.take(); + start(); + }; } let sched = Local::take::<Scheduler>(); @@ -401,16 +812,328 @@ impl Coroutine { } } +// XXX: Some hacks to put a &fn in Scheduler without borrowck +// complaining +type UnsafeTaskReceiver = sys::Closure; +trait ClosureConverter { + fn from_fn(&fn(&mut Scheduler, ~Coroutine)) -> Self; + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine); +} +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(&mut Scheduler, ~Coroutine)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(&mut Scheduler, ~Coroutine) { unsafe { transmute(self) } } +} + #[cfg(test)] mod test { use int; use cell::Cell; - use rt::uv::uvio::UvEventLoop; + use iterator::IteratorUtil; use unstable::run_in_bare_thread; use task::spawn; use rt::local::Local; use rt::test::*; use super::*; + use rt::thread::Thread; + use ptr::to_uint; + use vec::MutableVector; + + // Confirm that a sched_id actually is the uint form of the + // pointer to the scheduler struct. + + #[test] + fn simple_sched_id_test() { + do run_in_bare_thread { + let sched = ~new_test_uv_sched(); + assert!(to_uint(sched) == sched.sched_id()); + } + } + + // Compare two scheduler ids that are different, this should never + // fail but may catch a mistake someday. + + #[test] + fn compare_sched_id_test() { + do run_in_bare_thread { + let sched_one = ~new_test_uv_sched(); + let sched_two = ~new_test_uv_sched(); + assert!(sched_one.sched_id() != sched_two.sched_id()); + } + } + + // A simple test to check if a homed task run on a single + // scheduler ends up executing while home. + + #[test] + fn test_home_sched() { + do run_in_bare_thread { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + let mut sched = ~new_test_uv_sched(); + + let sched_handle = sched.make_handle(); + let sched_id = sched.sched_id(); + + let task = ~do Coroutine::new_homed(&mut sched.stack_pool, + Sched(sched_handle)) { + unsafe { *task_ran_ptr = true }; + let sched = Local::take::<Scheduler>(); + assert!(sched.sched_id() == sched_id); + Local::put::<Scheduler>(sched); + }; + sched.enqueue_task(task); + sched.run(); + assert!(task_ran); + } + } + + // A test for each state of schedule_task + + #[test] + fn test_schedule_home_states() { + + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + + do run_in_bare_thread { +// let nthreads = 2; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + // our normal scheduler + let mut normal_sched = ~Scheduler::new( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone()); + + let normal_handle = Cell::new(normal_sched.make_handle()); + + // our special scheduler + let mut special_sched = ~Scheduler::new_special( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone(), + true); + + let special_handle = Cell::new(special_sched.make_handle()); + let special_handle2 = Cell::new(special_sched.make_handle()); + let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); + let t4_handle = special_sched.make_handle(); + + let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) { + let is_home = Coroutine::is_home_using_id(special_id); + rtdebug!("t1 should be home: %b", is_home); + assert!(is_home); + }; + let t1f = Cell::new(t1f); + + let t2f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { + let on_special = Coroutine::on_special(); + rtdebug!("t2 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t2f = Cell::new(t2f); + + let t3f = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { + // not on special + let on_special = Coroutine::on_special(); + rtdebug!("t3 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t3f = Cell::new(t3f); + + let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + // is home + let home = Coroutine::is_home_using_id(special_id); + rtdebug!("t4 should be home: %b", home); + assert!(home); + }; + let t4f = Cell::new(t4f); + + // we have four tests, make them as closures + let t1: ~fn() = || { + // task is home on special + let task = t1f.take(); + let sched = Local::take::<Scheduler>(); + sched.schedule_task(task); + }; + let t2: ~fn() = || { + // not homed, task doesn't care + let task = t2f.take(); + let sched = Local::take::<Scheduler>(); + sched.schedule_task(task); + }; + let t3: ~fn() = || { + // task not homed, must leave + let task = t3f.take(); + let sched = Local::take::<Scheduler>(); + sched.schedule_task(task); + }; + let t4: ~fn() = || { + // task not home, send home + let task = t4f.take(); + let sched = Local::take::<Scheduler>(); + sched.schedule_task(task); + }; + + let t1 = Cell::new(t1); + let t2 = Cell::new(t2); + let t3 = Cell::new(t3); + let t4 = Cell::new(t4); + + // build a main task that runs our four tests + let main_task = ~do Coroutine::new_root(&mut normal_sched.stack_pool) { + // the two tasks that require a normal start location + t2.take()(); + t4.take()(); + normal_handle.take().send(Shutdown); + special_handle.take().send(Shutdown); + }; + + // task to run the two "special start" tests + let special_task = ~do Coroutine::new_homed( + &mut special_sched.stack_pool, + Sched(special_handle2.take())) { + t1.take()(); + t3.take()(); + }; + + // enqueue the main tasks + normal_sched.enqueue_task(special_task); + normal_sched.enqueue_task(main_task); + + let nsched_cell = Cell::new(normal_sched); + let normal_thread = do Thread::start { + let sched = nsched_cell.take(); + sched.run(); + }; + + let ssched_cell = Cell::new(special_sched); + let special_thread = do Thread::start { + let sched = ssched_cell.take(); + sched.run(); + }; + + // wait for the end + let _thread1 = normal_thread; + let _thread2 = special_thread; + + } + } + + // The following test is a bit of a mess, but it trys to do + // something tricky so I'm not sure how to get around this in the + // short term. + + // A number of schedulers are created, and then a task is created + // and assigned a home scheduler. It is then "started" on a + // different scheduler. The scheduler it is started on should + // observe that the task is not home, and send it home. + + // This test is light in that it does very little. + + #[test] + fn test_transfer_task_home() { + + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use uint; + use container::Container; + use vec::OwnedVector; + + do run_in_bare_thread { + + static N: uint = 8; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, N) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, + work_queue.clone(), + sleepers.clone()); + let handle = sched.make_handle(); + rtdebug!("sched id: %u", handle.sched_id); + handles.push(handle); + scheds.push(sched); + }; + + let handles = Cell::new(handles); + + let home_handle = scheds[6].make_handle(); + let home_id = home_handle.sched_id; + let home = Sched(home_handle); + + let main_task = ~do Coroutine::new_homed(&mut scheds[1].stack_pool, home) { + + // Here we check if the task is running on its home. + let sched = Local::take::<Scheduler>(); + rtdebug!("run location scheduler id: %u, home: %u", + sched.sched_id(), + home_id); + assert!(sched.sched_id() == home_id); + Local::put::<Scheduler>(sched); + + let mut handles = handles.take(); + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + }; + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + threads.push(thread); + } + + let _threads = threads; + } + } + + // Do it a lot + + #[test] + fn test_stress_schedule_task_states() { + let n = stress_factor() * 120; + for int::range(0,n as int) |_| { + test_schedule_home_states(); + } + } + + // The goal is that this is the high-stress test for making sure + // homing is working. It allocates RUST_RT_STRESS tasks that + // do nothing but assert that they are home at execution + // time. These tasks are queued to random schedulers, so sometimes + // they are home and sometimes not. It also runs RUST_RT_STRESS + // times. + + #[test] + fn test_stress_homed_tasks() { + let n = stress_factor(); + for int::range(0,n as int) |_| { + run_in_mt_newsched_task_random_homed(); + } + } #[test] fn test_simple_scheduling() { @@ -418,8 +1141,8 @@ mod test { let mut task_ran = false; let task_ran_ptr: *mut bool = &mut task_ran; - let mut sched = ~UvEventLoop::new_scheduler(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let mut sched = ~new_test_uv_sched(); + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_ran_ptr = true; } }; sched.enqueue_task(task); @@ -435,9 +1158,9 @@ mod test { let mut task_count = 0; let task_count_ptr: *mut int = &mut task_count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); for int::range(0, total) |_| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *task_count_ptr = *task_count_ptr + 1; } }; sched.enqueue_task(task); @@ -453,19 +1176,17 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); - let task1 = ~do Coroutine::new(&mut sched.stack_pool) { + let mut sched = ~new_test_uv_sched(); + let task1 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } let mut sched = Local::take::<Scheduler>(); - let task2 = ~do Coroutine::new(&mut sched.stack_pool) { + let task2 = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; // Context switch directly to the new task - do sched.switch_running_tasks_and_then(task2) |task1| { + do sched.switch_running_tasks_and_then(task2) |sched, task1| { let task1 = Cell::new(task1); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task1.take()); - } + sched.enqueue_task(task1.take()); } unsafe { *count_ptr = *count_ptr + 1; } }; @@ -482,9 +1203,9 @@ mod test { let mut count = 0; let count_ptr: *mut int = &mut count; - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); - let start_task = ~do Coroutine::new(&mut sched.stack_pool) { + let start_task = ~do Coroutine::new_root(&mut sched.stack_pool) { run_task(count_ptr); }; sched.enqueue_task(start_task); @@ -493,8 +1214,8 @@ mod test { assert_eq!(count, MAX); fn run_task(count_ptr: *mut int) { - do Local::borrow::<Scheduler> |sched| { - let task = ~do Coroutine::new(&mut sched.stack_pool) { + do Local::borrow::<Scheduler, ()> |sched| { + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; if *count_ptr != MAX { @@ -511,16 +1232,14 @@ mod test { #[test] fn test_block_task() { do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); - let task = ~do Coroutine::new(&mut sched.stack_pool) { + let mut sched = ~new_test_uv_sched(); + let task = ~do Coroutine::new_root(&mut sched.stack_pool) { let sched = Local::take::<Scheduler>(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |task| { + do sched.deschedule_running_task_and_then() |sched, task| { let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - assert!(!sched.in_task_context()); - sched.enqueue_task(task.take()); - } + assert!(!sched.in_task_context()); + sched.enqueue_task(task.take()); } }; sched.enqueue_task(task); @@ -537,8 +1256,7 @@ mod test { do run_in_newsched_task { do spawn { let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { - let mut sched = Local::take::<Scheduler>(); + do sched.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); do sched.event_loop.callback_ms(10) { rtdebug!("in callback"); @@ -546,9 +1264,149 @@ mod test { sched.enqueue_task(task.take()); Local::put(sched); } - Local::put(sched); } } } } + + #[test] + fn handle() { + use rt::comm::*; + + do run_in_bare_thread { + let (port, chan) = oneshot::<()>(); + let port_cell = Cell::new(port); + let chan_cell = Cell::new(chan); + let mut sched1 = ~new_test_uv_sched(); + let handle1 = sched1.make_handle(); + let handle1_cell = Cell::new(handle1); + let task1 = ~do Coroutine::new_root(&mut sched1.stack_pool) { + chan_cell.take().send(()); + }; + sched1.enqueue_task(task1); + + let mut sched2 = ~new_test_uv_sched(); + let task2 = ~do Coroutine::new_root(&mut sched2.stack_pool) { + port_cell.take().recv(); + // Release the other scheduler's handle so it can exit + handle1_cell.take(); + }; + sched2.enqueue_task(task2); + + let sched1_cell = Cell::new(sched1); + let _thread1 = do Thread::start { + let sched1 = sched1_cell.take(); + sched1.run(); + }; + + let sched2_cell = Cell::new(sched2); + let _thread2 = do Thread::start { + let sched2 = sched2_cell.take(); + sched2.run(); + }; + } + } + + #[test] + fn multithreading() { + use rt::comm::*; + use iter::Times; + use vec::OwnedVector; + use container::Container; + + do run_in_mt_newsched_task { + let mut ports = ~[]; + for 10.times { + let (port, chan) = oneshot(); + let chan_cell = Cell::new(chan); + do spawntask_later { + chan_cell.take().send(()); + } + ports.push(port); + } + + while !ports.is_empty() { + ports.pop().recv(); + } + } + } + + #[test] + fn thread_ring() { + use rt::comm::*; + use comm::{GenericPort, GenericChan}; + + do run_in_mt_newsched_task { + let (end_port, end_chan) = oneshot(); + + let n_tasks = 10; + let token = 2000; + + let mut (p, ch1) = stream(); + ch1.send((token, end_chan)); + let mut i = 2; + while i <= n_tasks { + let (next_p, ch) = stream(); + let imm_i = i; + let imm_p = p; + do spawntask_random { + roundtrip(imm_i, n_tasks, &imm_p, &ch); + }; + p = next_p; + i += 1; + } + let imm_p = p; + let imm_ch = ch1; + do spawntask_random { + roundtrip(1, n_tasks, &imm_p, &imm_ch); + } + + end_port.recv(); + } + + fn roundtrip(id: int, n_tasks: int, + p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { + while (true) { + match p.recv() { + (1, end_chan) => { + debug!("%d\n", id); + end_chan.send(()); + return; + } + (token, end_chan) => { + debug!("thread: %d got token: %d", id, token); + ch.send((token - 1, end_chan)); + if token <= n_tasks { + return; + } + } + } + } + } + + } + + #[test] + fn start_closure_dtor() { + use ops::Drop; + + // Regression test that the `start` task entrypoint can + // contain dtors that use task resources + do run_in_newsched_task { + struct S { field: () } + + impl Drop for S { + fn finalize(&self) { + let _foo = @0; + } + } + + let s = S { field: () }; + + do spawntask { + let _ss = &s; + } + } + } + } diff --git a/src/libstd/rt/sleeper_list.rs b/src/libstd/rt/sleeper_list.rs new file mode 100644 index 00000000000..3d6e9ef5635 --- /dev/null +++ b/src/libstd/rt/sleeper_list.rs @@ -0,0 +1,59 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Maintains a shared list of sleeping schedulers. Schedulers +//! use this to wake each other up. + +use container::Container; +use vec::OwnedVector; +use option::{Option, Some, None}; +use cell::Cell; +use unstable::sync::{Exclusive, exclusive}; +use rt::sched::SchedHandle; +use clone::Clone; + +pub struct SleeperList { + priv stack: ~Exclusive<~[SchedHandle]> +} + +impl SleeperList { + pub fn new() -> SleeperList { + SleeperList { + stack: ~exclusive(~[]) + } + } + + pub fn push(&mut self, handle: SchedHandle) { + let handle = Cell::new(handle); + unsafe { + self.stack.with(|s| s.push(handle.take())); + } + } + + pub fn pop(&mut self) -> Option<SchedHandle> { + unsafe { + do self.stack.with |s| { + if !s.is_empty() { + Some(s.pop()) + } else { + None + } + } + } + } +} + +impl Clone for SleeperList { + fn clone(&self) -> SleeperList { + SleeperList { + stack: self.stack.clone() + } + } +} \ No newline at end of file diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 41390aec80c..e7f87906fe5 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -18,16 +18,22 @@ use cast::transmute; use libc::{c_void, uintptr_t}; use ptr; use prelude::*; +use option::{Option, Some, None}; use rt::local::Local; use rt::logging::StdErrLogger; use super::local_heap::LocalHeap; +use rt::sched::{SchedHome, AnySched}; +use rt::join_latch::JoinLatch; pub struct Task { heap: LocalHeap, gc: GarbageCollector, storage: LocalStorage, logger: StdErrLogger, - unwinder: Option<Unwinder>, + unwinder: Unwinder, + home: Option<SchedHome>, + join_latch: Option<~JoinLatch>, + on_exit: Option<~fn(bool)>, destroyed: bool } @@ -39,49 +45,63 @@ pub struct Unwinder { } impl Task { - pub fn new() -> Task { + pub fn new_root() -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: Some(Unwinder { unwinding: false }), + unwinder: Unwinder { unwinding: false }, + home: Some(AnySched), + join_latch: Some(JoinLatch::new_root()), + on_exit: None, destroyed: false } } - pub fn without_unwinding() -> Task { + pub fn new_child(&mut self) -> Task { Task { heap: LocalHeap::new(), gc: GarbageCollector, storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, - unwinder: None, + home: Some(AnySched), + unwinder: Unwinder { unwinding: false }, + join_latch: Some(self.join_latch.get_mut_ref().new_child()), + on_exit: None, destroyed: false } } + pub fn give_home(&mut self, new_home: SchedHome) { + self.home = Some(new_home); + } + pub fn run(&mut self, f: &fn()) { // This is just an assertion that `run` was called unsafely // and this instance of Task is still accessible. - do Local::borrow::<Task> |task| { + do Local::borrow::<Task, ()> |task| { assert!(borrow::ref_eq(task, self)); } - match self.unwinder { - Some(ref mut unwinder) => { - // If there's an unwinder then set up the catch block - unwinder.try(f); + self.unwinder.try(f); + self.destroy(); + + // Wait for children. Possibly report the exit status. + let local_success = !self.unwinder.unwinding; + let join_latch = self.join_latch.swap_unwrap(); + match self.on_exit { + Some(ref on_exit) => { + let success = join_latch.wait(local_success); + (*on_exit)(success); } None => { - // Otherwise, just run the body - f() + join_latch.release(local_success); } } - self.destroy(); } - /// Must be called manually before finalization to clean up + /// must be called manually before finalization to clean up /// thread-local resources. Some of the routines here expect /// Task to be available recursively so this must be /// called unsafely, without removing Task from @@ -89,7 +109,7 @@ impl Task { fn destroy(&mut self) { // This is just an assertion that `destroy` was called unsafely // and this instance of Task is still accessible. - do Local::borrow::<Task> |task| { + do Local::borrow::<Task, ()> |task| { assert!(borrow::ref_eq(task, self)); } match self.storage { @@ -227,4 +247,14 @@ mod test { assert!(port.recv() == 10); } } + + #[test] + fn linked_failure() { + do run_in_newsched_task() { + let res = do spawntask_try { + spawntask_random(|| fail!()); + }; + assert!(res.is_err()); + } + } } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index fe08d85c947..6e4fb9b1d94 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -9,13 +9,33 @@ // except according to those terms. use uint; -use option::*; +use option::{Some, None}; use cell::Cell; +use clone::Clone; +use container::Container; +use iterator::IteratorUtil; +use vec::{OwnedVector, MutableVector}; use result::{Result, Ok, Err}; +use unstable::run_in_bare_thread; use super::io::net::ip::{IpAddr, Ipv4}; +use rt::comm::oneshot; use rt::task::Task; use rt::thread::Thread; use rt::local::Local; +use rt::sched::{Scheduler, Coroutine}; +use rt::sleeper_list::SleeperList; +use rt::work_queue::WorkQueue; + +pub fn new_test_uv_sched() -> Scheduler { + use rt::uv::uvio::UvEventLoop; + use rt::work_queue::WorkQueue; + use rt::sleeper_list::SleeperList; + + let mut sched = Scheduler::new(~UvEventLoop::new(), WorkQueue::new(), SleeperList::new()); + // Don't wait for the Shutdown message + sched.no_sleep = true; + return sched; +} /// Creates a new scheduler in a new thread and runs a task in it, /// then waits for the scheduler to exit. Failure of the task @@ -23,48 +43,240 @@ use rt::local::Local; pub fn run_in_newsched_task(f: ~fn()) { use super::sched::*; use unstable::run_in_bare_thread; - use rt::uv::uvio::UvEventLoop; let f = Cell::new(f); do run_in_bare_thread { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| rtassert!(exit_status); + new_task.on_exit = Some(on_exit); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + new_task, f.take()); sched.enqueue_task(task); sched.run(); } } +/// Create more than one scheduler and run a function in a task +/// in one of the schedulers. The schedulers will stay alive +/// until the function `f` returns. +pub fn run_in_mt_newsched_task(f: ~fn()) { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + let f_cell = Cell::new(f); + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code + // to force the OS to preempt them frequently. + // Assuming that this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, nthreads) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); + let handle = sched.make_handle(); + + handles.push(handle); + scheds.push(sched); + } + + let f_cell = Cell::new(f_cell.take()); + let handles = Cell::new(handles); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| { + + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + + rtassert!(exit_status); + }; + new_task.on_exit = Some(on_exit); + let main_task = ~Coroutine::with_task(&mut scheds[0].stack_pool, + new_task, f_cell.take()); + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + + threads.push(thread); + } + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + +// THIS IS AWFUL. Copy-pasted the above initialization function but +// with a number of hacks to make it spawn tasks on a variety of +// schedulers with a variety of homes using the new spawn. + +pub fn run_in_mt_newsched_task_random_homed() { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code to force + // the OS to preempt them frequently. Assuming that + // this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + // create a few special schedulers, those with even indicies + // will be pinned-only + for uint::range(0, nthreads) |i| { + let special = (i % 2) == 0; + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new_special( + loop_, work_queue.clone(), sleepers.clone(), special); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + // Schedule a pile o tasks + let n = 5*stress_factor(); + for uint::range(0,n) |_i| { + rtdebug!("creating task: %u", _i); + let hf: ~fn() = || { assert!(true) }; + spawntask_homed(&mut scheds, hf); + } + + // Now we want another pile o tasks that do not ever run on a + // special scheduler, because they are normal tasks. Because + // we can we put these in the "main" task. + + let n = 5*stress_factor(); + + let f: ~fn() = || { + for uint::range(0,n) |_| { + let f: ~fn() = || { + // Borrow the scheduler we run on and check if it is + // privileged. + do Local::borrow::<Scheduler,()> |sched| { + assert!(sched.run_anything); + }; + }; + spawntask_random(f); + }; + }; + + let f_cell = Cell::new(f); + let handles = Cell::new(handles); + + rtdebug!("creating main task"); + + let main_task = ~do Coroutine::new_root(&mut scheds[0].stack_pool) { + f_cell.take()(); + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.mut_iter().advance |handle| { + handle.send(Shutdown); + } + }; + + rtdebug!("queuing main task") + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell::new(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + rtdebug!("running sched: %u", sched.sched_id()); + sched.run(); + }; + + threads.push(thread); + } + + rtdebug!("waiting on scheduler threads"); + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; + rtdebug!("spawntask taking the scheduler from TLS") + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - let sched = Local::take::<Scheduler>(); - sched.schedule_new_task(task.take()); - } + task, f); + rtdebug!("spawntask scheduling the new task"); + sched.schedule_task(task); } /// Create a new task and run it right now. Aborts on failure pub fn spawntask_immediately(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + task, f); + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } @@ -72,10 +284,13 @@ pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_later(f: ~fn()) { use super::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); sched.enqueue_task(task); Local::put(sched); @@ -86,20 +301,20 @@ pub fn spawntask_random(f: ~fn()) { use super::sched::*; use rand::{Rand, rng}; - let mut rng = rng(); - let run_now: bool = Rand::rand(&mut rng); + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; let mut sched = Local::take::<Scheduler>(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), - f); + task, f); + + let mut rng = rng(); + let run_now: bool = Rand::rand(&mut rng); if run_now { - do sched.switch_running_tasks_and_then(task) |task| { - let task = Cell::new(task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(task.take()); - } + do sched.switch_running_tasks_and_then(task) |sched, task| { + sched.enqueue_task(task); } } else { sched.enqueue_task(task); @@ -107,57 +322,75 @@ pub fn spawntask_random(f: ~fn()) { } } +/// Spawn a task, with the current scheduler as home, and queue it to +/// run later. +pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { + use super::sched::*; + use rand::{rng, RngUtil}; + let mut rng = rng(); + + let task = { + let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + let handle = sched.make_handle(); + let home_id = handle.sched_id; + + // now that we know where this is going, build a new function + // that can assert it is in the right place + let af: ~fn() = || { + do Local::borrow::<Scheduler,()>() |sched| { + rtdebug!("home_id: %u, runtime loc: %u", + home_id, + sched.sched_id()); + assert!(home_id == sched.sched_id()); + }; + f() + }; + + ~Coroutine::with_task_homed(&mut sched.stack_pool, + ~Task::new_root(), + af, + Sched(handle)) + }; + let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + // enqueue it for future execution + dest_sched.enqueue_task(task); +} /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { use cell::Cell; use super::sched::*; - use task; - use unstable::finally::Finally; - - // Our status variables will be filled in from the scheduler context - let mut failed = false; - let failed_ptr: *mut bool = &mut failed; - - // Switch to the scheduler - let f = Cell::new(Cell::new(f)); - let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then() |old_task| { - let old_task = Cell::new(old_task); - let f = f.take(); - let mut sched = Local::take::<Scheduler>(); - let new_task = ~do Coroutine::new(&mut sched.stack_pool) { - do (|| { - (f.take())() - }).finally { - // Check for failure then resume the parent task - unsafe { *failed_ptr = task::failing(); } - let sched = Local::take::<Scheduler>(); - do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { - let new_task = Cell::new(new_task); - do Local::borrow::<Scheduler> |sched| { - sched.enqueue_task(new_task.take()); - } - } - } - }; - sched.resume_task_immediately(new_task); + let (port, chan) = oneshot(); + let chan = Cell::new(chan); + let mut new_task = ~Task::new_root(); + let on_exit: ~fn(bool) = |exit_status| chan.take().send(exit_status); + new_task.on_exit = Some(on_exit); + let mut sched = Local::take::<Scheduler>(); + let new_task = ~Coroutine::with_task(&mut sched.stack_pool, + new_task, f); + do sched.switch_running_tasks_and_then(new_task) |sched, old_task| { + sched.enqueue_task(old_task); } - if !failed { Ok(()) } else { Err(()) } + let exit_status = port.recv(); + if exit_status { Ok(()) } else { Err(()) } } // Spawn a new task in a new scheduler and return a thread handle. pub fn spawntask_thread(f: ~fn()) -> Thread { use rt::sched::*; - use rt::uv::uvio::UvEventLoop; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + + let task = Cell::new(task); let f = Cell::new(f); let thread = do Thread::start { - let mut sched = ~UvEventLoop::new_scheduler(); + let mut sched = ~new_test_uv_sched(); let task = ~Coroutine::with_task(&mut sched.stack_pool, - ~Task::without_unwinding(), + task.take(), f.take()); sched.enqueue_task(task); sched.run(); diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index 03e11dfad1d..89f3d10b5e4 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -72,7 +72,7 @@ impl<T> Tube<T> { assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!((*state).blocked_task.is_none()); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |_, task| { (*state).blocked_task = Some(task); } rtdebug!("waking after tube recv"); @@ -107,11 +107,10 @@ mod test { let tube_clone = tube.clone(); let tube_clone_cell = Cell::new(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { let mut tube_clone = tube_clone_cell.take(); tube_clone.send(1); - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -123,21 +122,17 @@ mod test { do run_in_newsched_task { let mut tube: Tube<int> = Tube::new(); let tube_clone = tube.clone(); - let tube_clone = Cell::new(Cell::new(Cell::new(tube_clone))); + let tube_clone = Cell::new(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { - let tube_clone = tube_clone.take(); - do Local::borrow::<Scheduler> |sched| { - let tube_clone = tube_clone.take(); - do sched.event_loop.callback { - let mut tube_clone = tube_clone.take(); - // The task should be blocked on this now and - // sending will wake it up. - tube_clone.send(1); - } + do sched.deschedule_running_task_and_then |sched, task| { + let tube_clone = Cell::new(tube_clone.take()); + do sched.event_loop.callback { + let mut tube_clone = tube_clone.take(); + // The task should be blocked on this now and + // sending will wake it up. + tube_clone.send(1); } - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } assert!(tube.recv() == 1); @@ -153,14 +148,14 @@ mod test { let tube_clone = tube.clone(); let tube_clone = Cell::new(tube_clone); let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |task| { + do sched.deschedule_running_task_and_then |sched, task| { callback_send(tube_clone.take(), 0); fn callback_send(tube: Tube<int>, i: int) { if i == 100 { return; } let tube = Cell::new(Cell::new(tube)); - do Local::borrow::<Scheduler> |sched| { + do Local::borrow::<Scheduler, ()> |sched| { let tube = tube.take(); do sched.event_loop.callback { let mut tube = tube.take(); @@ -172,8 +167,7 @@ mod test { } } - let sched = Local::take::<Scheduler>(); - sched.resume_task_immediately(task); + sched.enqueue_task(task); } for int::range(0, MAX) |i| { diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs new file mode 100644 index 00000000000..f3d1024024f --- /dev/null +++ b/src/libstd/rt/uv/async.rs @@ -0,0 +1,105 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use libc::{c_int, c_void}; +use option::Some; +use rt::uv::uvll; +use rt::uv::uvll::UV_ASYNC; +use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback}; +use rt::uv::WatcherInterop; +use rt::uv::status_to_maybe_uv_error; + +pub struct AsyncWatcher(*uvll::uv_async_t); +impl Watcher for AsyncWatcher { } + +impl AsyncWatcher { + pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher { + unsafe { + let handle = uvll::malloc_handle(UV_ASYNC); + assert!(handle.is_not_null()); + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + watcher.install_watcher_data(); + let data = watcher.get_watcher_data(); + data.async_cb = Some(cb); + assert_eq!(0, uvll::async_init(loop_.native_handle(), handle, async_cb)); + return watcher; + } + + extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + let status = status_to_maybe_uv_error(watcher.native_handle(), status); + let data = watcher.get_watcher_data(); + let cb = data.async_cb.get_ref(); + (*cb)(watcher, status); + } + } + + pub fn send(&mut self) { + unsafe { + let handle = self.native_handle(); + uvll::async_send(handle); + } + } + + pub fn close(self, cb: NullCallback) { + let mut this = self; + let data = this.get_watcher_data(); + assert!(data.close_cb.is_none()); + data.close_cb = Some(cb); + + unsafe { + uvll::close(self.native_handle(), close_cb); + } + + extern fn close_cb(handle: *uvll::uv_stream_t) { + let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle); + { + let data = watcher.get_watcher_data(); + data.close_cb.swap_unwrap()(); + } + watcher.drop_watcher_data(); + unsafe { uvll::free_handle(handle as *c_void); } + } + } +} + +impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher { + fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher { + AsyncWatcher(handle) + } + fn native_handle(&self) -> *uvll::uv_async_t { + match self { &AsyncWatcher(ptr) => ptr } + } +} + +#[cfg(test)] +mod test { + + use super::*; + use rt::uv::Loop; + use unstable::run_in_bare_thread; + use rt::thread::Thread; + use cell::Cell; + + #[test] + fn smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) ); + let watcher_cell = Cell::new(watcher); + let _thread = do Thread::start { + let mut watcher = watcher_cell.take(); + watcher.send(); + }; + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs index e1def9ffd50..a3630c9b9bf 100644 --- a/src/libstd/rt/uv/idle.rs +++ b/src/libstd/rt/uv/idle.rs @@ -90,3 +90,65 @@ impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher { match self { &IdleWatcher(ptr) => ptr } } } + +#[cfg(test)] +mod test { + + use rt::uv::Loop; + use super::*; + use unstable::run_in_bare_thread; + + #[test] + #[ignore(reason = "valgrind - loop destroyed before watcher?")] + fn idle_new_then_close() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let idle_watcher = { IdleWatcher::new(&mut loop_) }; + idle_watcher.close(||()); + } + } + + #[test] + fn idle_smoke_test() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + let mut count = 10; + let count_ptr: *mut int = &mut count; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + if unsafe { *count_ptr == 10 } { + idle_watcher.stop(); + idle_watcher.close(||()); + } else { + unsafe { *count_ptr = *count_ptr + 1; } + } + } + loop_.run(); + loop_.close(); + assert_eq!(count, 10); + } + } + + #[test] + fn idle_start_stop_start() { + do run_in_bare_thread { + let mut loop_ = Loop::new(); + let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; + do idle_watcher.start |idle_watcher, status| { + let mut idle_watcher = idle_watcher; + assert!(status.is_none()); + idle_watcher.stop(); + do idle_watcher.start |idle_watcher, status| { + assert!(status.is_none()); + let mut idle_watcher = idle_watcher; + idle_watcher.stop(); + idle_watcher.close(||()); + } + } + loop_.run(); + loop_.close(); + } + } +} diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs index f50efc079a7..883eda0057f 100644 --- a/src/libstd/rt/uv/mod.rs +++ b/src/libstd/rt/uv/mod.rs @@ -55,6 +55,7 @@ pub use self::file::FsRequest; pub use self::net::{StreamWatcher, TcpWatcher}; pub use self::idle::IdleWatcher; pub use self::timer::TimerWatcher; +pub use self::async::AsyncWatcher; /// The implementation of `rtio` for libuv pub mod uvio; @@ -66,6 +67,7 @@ pub mod file; pub mod net; pub mod idle; pub mod timer; +pub mod async; /// XXX: Loop(*handle) is buggy with destructors. Normal structs /// with dtors may not be destructured, but tuple structs can, @@ -123,6 +125,7 @@ pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>); pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>); pub type FsCallback = ~fn(FsRequest, Option<UvError>); pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>); +pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>); /// Callbacks used by StreamWatchers, set as custom data on the foreign handle @@ -133,7 +136,8 @@ struct WatcherData { close_cb: Option<NullCallback>, alloc_cb: Option<AllocCallback>, idle_cb: Option<IdleCallback>, - timer_cb: Option<TimerCallback> + timer_cb: Option<TimerCallback>, + async_cb: Option<AsyncCallback> } pub trait WatcherInterop { @@ -162,7 +166,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { close_cb: None, alloc_cb: None, idle_cb: None, - timer_cb: None + timer_cb: None, + async_cb: None }; let data = transmute::<~WatcherData, *c_void>(data); uvll::set_data_for_uv_handle(self.native_handle(), data); @@ -347,57 +352,3 @@ fn loop_smoke_test() { loop_.close(); } } - -#[test] -#[ignore(reason = "valgrind - loop destroyed before watcher?")] -fn idle_new_then_close() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let idle_watcher = { IdleWatcher::new(&mut loop_) }; - idle_watcher.close(||()); - } -} - -#[test] -fn idle_smoke_test() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - let mut count = 10; - let count_ptr: *mut int = &mut count; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - if unsafe { *count_ptr == 10 } { - idle_watcher.stop(); - idle_watcher.close(||()); - } else { - unsafe { *count_ptr = *count_ptr + 1; } - } - } - loop_.run(); - loop_.close(); - assert_eq!(count, 10); - } -} - -#[test] -fn idle_start_stop_start() { - do run_in_bare_thread { - let mut loop_ = Loop::new(); - let mut idle_watcher = { IdleWatcher::new(&mut loop_) }; - do idle_watcher.start |idle_watcher, status| { - let mut idle_watcher = idle_watcher; - assert!(status.is_none()); - idle_watcher.stop(); - do idle_watcher.start |idle_watcher, status| { - assert!(status.is_none()); - let mut idle_watcher = idle_watcher; - idle_watcher.stop(); - idle_watcher.close(||()); - } - } - loop_.run(); - loop_.close(); - } -} diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 964ee460c1d..298277b3df0 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -12,6 +12,7 @@ use option::*; use result::*; use ops::Drop; use cell::Cell; +use cast; use cast::transmute; use clone::Clone; use rt::io::IoError; @@ -23,6 +24,7 @@ use rt::sched::Scheduler; use rt::io::{standard_error, OtherIoError}; use rt::tube::Tube; use rt::local::Local; +use unstable::sync::{Exclusive, exclusive}; #[cfg(test)] use container::Container; #[cfg(test)] use uint; @@ -39,11 +41,6 @@ impl UvEventLoop { uvio: UvIoFactory(Loop::new()) } } - - /// A convenience constructor - pub fn new_scheduler() -> Scheduler { - Scheduler::new(~UvEventLoop::new()) - } } impl Drop for UvEventLoop { @@ -81,6 +78,10 @@ impl EventLoop for UvEventLoop { } } + fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallbackObject { + ~UvRemoteCallback::new(self.uvio.uv_loop(), f) + } + fn io<'a>(&'a mut self) -> Option<&'a mut IoFactoryObject> { Some(&mut self.uvio) } @@ -100,6 +101,89 @@ fn test_callback_run_once() { } } +pub struct UvRemoteCallback { + // The uv async handle for triggering the callback + async: AsyncWatcher, + // A flag to tell the callback to exit, set from the dtor. This is + // almost never contested - only in rare races with the dtor. + exit_flag: Exclusive<bool> +} + +impl UvRemoteCallback { + pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback { + let exit_flag = exclusive(false); + let exit_flag_clone = exit_flag.clone(); + let async = do AsyncWatcher::new(loop_) |watcher, status| { + assert!(status.is_none()); + f(); + unsafe { + do exit_flag_clone.with_imm |&should_exit| { + if should_exit { + watcher.close(||()); + } + } + } + }; + UvRemoteCallback { + async: async, + exit_flag: exit_flag + } + } +} + +impl RemoteCallback for UvRemoteCallback { + fn fire(&mut self) { self.async.send() } +} + +impl Drop for UvRemoteCallback { + fn finalize(&self) { + unsafe { + let this: &mut UvRemoteCallback = cast::transmute_mut(self); + do this.exit_flag.with |should_exit| { + // NB: These two things need to happen atomically. Otherwise + // the event handler could wake up due to a *previous* + // signal and see the exit flag, destroying the handle + // before the final send. + *should_exit = true; + this.async.send(); + } + } + } +} + +#[cfg(test)] +mod test_remote { + use cell::Cell; + use rt::test::*; + use rt::thread::Thread; + use rt::tube::Tube; + use rt::rtio::EventLoop; + use rt::local::Local; + use rt::sched::Scheduler; + + #[test] + fn test_uv_remote() { + do run_in_newsched_task { + let mut tube = Tube::new(); + let tube_clone = tube.clone(); + let remote_cell = Cell::new_empty(); + do Local::borrow::<Scheduler, ()>() |sched| { + let tube_clone = tube_clone.clone(); + let tube_clone_cell = Cell::new(tube_clone); + let remote = do sched.event_loop.remote_callback { + tube_clone_cell.take().send(1); + }; + remote_cell.put_back(remote); + } + let _thread = do Thread::start { + remote_cell.take().fire(); + }; + + assert!(tube.recv() == 1); + } + } +} + pub struct UvIoFactory(Loop); impl UvIoFactory { @@ -122,12 +206,10 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("connect: entered scheduler context"); - do Local::borrow::<Scheduler> |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell::new(task); @@ -167,7 +249,7 @@ impl IoFactory for UvIoFactory { Ok(_) => Ok(~UvTcpListener::new(watcher)), Err(uverr) => { let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); @@ -203,7 +285,7 @@ impl Drop for UvTcpListener { fn finalize(&self) { let watcher = self.watcher(); let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do watcher.as_stream().close { let scheduler = Local::take::<Scheduler>(); @@ -265,7 +347,7 @@ impl Drop for UvTcpStream { rtdebug!("closing tcp stream"); let watcher = self.watcher(); let scheduler = Local::take::<Scheduler>(); - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell::new(task); do watcher.close { let scheduler = Local::take::<Scheduler>(); @@ -284,11 +366,9 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { rtdebug!("read: entered scheduler context"); - do Local::borrow::<Scheduler> |scheduler| { - assert!(!scheduler.in_task_context()); - } + assert!(!sched.in_task_context()); let mut watcher = watcher; let task_cell = Cell::new(task); // XXX: We shouldn't reallocate these callbacks every @@ -330,7 +410,7 @@ impl RtioTcpStream for UvTcpStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell::new(task); let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; @@ -424,11 +504,9 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |task| { + do scheduler.deschedule_running_task_and_then |sched, task| { let task = Cell::new(task); - do Local::borrow::<Scheduler> |scheduler| { - scheduler.enqueue_task(task.take()); - } + sched.enqueue_task(task.take()); } } diff --git a/src/libstd/rt/uvll.rs b/src/libstd/rt/uvll.rs deleted file mode 100644 index 0d298bde6b5..00000000000 --- a/src/libstd/rt/uvll.rs +++ /dev/null @@ -1,443 +0,0 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! - * Low-level bindings to the libuv library. - * - * This module contains a set of direct, 'bare-metal' wrappers around - * the libuv C-API. - * - * We're not bothering yet to redefine uv's structs as Rust structs - * because they are quite large and change often between versions. - * The maintenance burden is just too high. Instead we use the uv's - * `uv_handle_size` and `uv_req_size` to find the correct size of the - * structs and allocate them on the heap. This can be revisited later. - * - * There are also a collection of helper functions to ease interacting - * with the low-level API. - * - * As new functionality, existant in uv.h, is added to the rust stdlib, - * the mappings should be added in this module. - */ - -#[allow(non_camel_case_types)]; // C types - -use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t}; -use libc::{malloc, free}; -use prelude::*; - -pub struct uv_err_t { - code: c_int, - sys_errno_: c_int -} - -pub struct uv_buf_t { - base: *u8, - len: libc::size_t, -} - -pub type uv_handle_t = c_void; -pub type uv_loop_t = c_void; -pub type uv_idle_t = c_void; -pub type uv_tcp_t = c_void; -pub type uv_connect_t = c_void; -pub type uv_write_t = c_void; -pub type uv_async_t = c_void; -pub type uv_timer_t = c_void; -pub type uv_stream_t = c_void; -pub type uv_fs_t = c_void; - -pub type uv_idle_cb = *u8; - -pub type sockaddr_in = c_void; -pub type sockaddr_in6 = c_void; - -#[deriving(Eq)] -pub enum uv_handle_type { - UV_UNKNOWN_HANDLE, - UV_ASYNC, - UV_CHECK, - UV_FS_EVENT, - UV_FS_POLL, - UV_HANDLE, - UV_IDLE, - UV_NAMED_PIPE, - UV_POLL, - UV_PREPARE, - UV_PROCESS, - UV_STREAM, - UV_TCP, - UV_TIMER, - UV_TTY, - UV_UDP, - UV_SIGNAL, - UV_FILE, - UV_HANDLE_TYPE_MAX -} - -#[deriving(Eq)] -pub enum uv_req_type { - UV_UNKNOWN_REQ, - UV_REQ, - UV_CONNECT, - UV_WRITE, - UV_SHUTDOWN, - UV_UDP_SEND, - UV_FS, - UV_WORK, - UV_GETADDRINFO, - UV_REQ_TYPE_MAX -} - -pub unsafe fn malloc_handle(handle: uv_handle_type) -> *c_void { - assert!(handle != UV_UNKNOWN_HANDLE && handle != UV_HANDLE_TYPE_MAX); - let size = rust_uv_handle_size(handle as uint); - let p = malloc(size); - assert!(p.is_not_null()); - return p; -} - -pub unsafe fn free_handle(v: *c_void) { - free(v) -} - -pub unsafe fn malloc_req(req: uv_req_type) -> *c_void { - assert!(req != UV_UNKNOWN_REQ && req != UV_REQ_TYPE_MAX); - let size = rust_uv_req_size(req as uint); - let p = malloc(size); - assert!(p.is_not_null()); - return p; -} - -pub unsafe fn free_req(v: *c_void) { - free(v) -} - -#[test] -fn handle_sanity_check() { - unsafe { - assert!(UV_HANDLE_TYPE_MAX as uint == rust_uv_handle_type_max()); - } -} - -#[test] -fn request_sanity_check() { - unsafe { - assert!(UV_REQ_TYPE_MAX as uint == rust_uv_req_type_max()); - } -} - -pub unsafe fn loop_new() -> *c_void { - return rust_uv_loop_new(); -} - -pub unsafe fn loop_delete(loop_handle: *c_void) { - rust_uv_loop_delete(loop_handle); -} - -pub unsafe fn run(loop_handle: *c_void) { - rust_uv_run(loop_handle); -} - -pub unsafe fn close<T>(handle: *T, cb: *u8) { - rust_uv_close(handle as *c_void, cb); -} - -pub unsafe fn walk(loop_handle: *c_void, cb: *u8, arg: *c_void) { - rust_uv_walk(loop_handle, cb, arg); -} - -pub unsafe fn idle_new() -> *uv_idle_t { - rust_uv_idle_new() -} - -pub unsafe fn idle_delete(handle: *uv_idle_t) { - rust_uv_idle_delete(handle) -} - -pub unsafe fn idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int { - rust_uv_idle_init(loop_handle, handle) -} - -pub unsafe fn idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int { - rust_uv_idle_start(handle, cb) -} - -pub unsafe fn idle_stop(handle: *uv_idle_t) -> c_int { - rust_uv_idle_stop(handle) -} - -pub unsafe fn tcp_init(loop_handle: *c_void, handle: *uv_tcp_t) -> c_int { - return rust_uv_tcp_init(loop_handle, handle); -} - -// FIXME ref #2064 -pub unsafe fn tcp_connect(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - addr_ptr: *sockaddr_in, - after_connect_cb: *u8) -> c_int { - return rust_uv_tcp_connect(connect_ptr, tcp_handle_ptr, - after_connect_cb, addr_ptr); -} -// FIXME ref #2064 -pub unsafe fn tcp_connect6(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - addr_ptr: *sockaddr_in6, - after_connect_cb: *u8) -> c_int { - return rust_uv_tcp_connect6(connect_ptr, tcp_handle_ptr, - after_connect_cb, addr_ptr); -} -// FIXME ref #2064 -pub unsafe fn tcp_bind(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in) -> c_int { - return rust_uv_tcp_bind(tcp_server_ptr, addr_ptr); -} -// FIXME ref #2064 -pub unsafe fn tcp_bind6(tcp_server_ptr: *uv_tcp_t, addr_ptr: *sockaddr_in6) -> c_int { - return rust_uv_tcp_bind6(tcp_server_ptr, addr_ptr); -} - -pub unsafe fn tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in) -> c_int { - return rust_uv_tcp_getpeername(tcp_handle_ptr, name); -} - -pub unsafe fn tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, name: *sockaddr_in6) ->c_int { - return rust_uv_tcp_getpeername6(tcp_handle_ptr, name); -} - -pub unsafe fn listen<T>(stream: *T, backlog: c_int, cb: *u8) -> c_int { - return rust_uv_listen(stream as *c_void, backlog, cb); -} - -pub unsafe fn accept(server: *c_void, client: *c_void) -> c_int { - return rust_uv_accept(server as *c_void, client as *c_void); -} - -pub unsafe fn write<T>(req: *uv_write_t, stream: *T, buf_in: &[uv_buf_t], cb: *u8) -> c_int { - let buf_ptr = vec::raw::to_ptr(buf_in); - let buf_cnt = buf_in.len() as i32; - return rust_uv_write(req as *c_void, stream as *c_void, buf_ptr, buf_cnt, cb); -} -pub unsafe fn read_start(stream: *uv_stream_t, on_alloc: *u8, on_read: *u8) -> c_int { - return rust_uv_read_start(stream as *c_void, on_alloc, on_read); -} - -pub unsafe fn read_stop(stream: *uv_stream_t) -> c_int { - return rust_uv_read_stop(stream as *c_void); -} - -pub unsafe fn last_error(loop_handle: *c_void) -> uv_err_t { - return rust_uv_last_error(loop_handle); -} - -pub unsafe fn strerror(err: *uv_err_t) -> *c_char { - return rust_uv_strerror(err); -} -pub unsafe fn err_name(err: *uv_err_t) -> *c_char { - return rust_uv_err_name(err); -} - -pub unsafe fn async_init(loop_handle: *c_void, async_handle: *uv_async_t, cb: *u8) -> c_int { - return rust_uv_async_init(loop_handle, async_handle, cb); -} - -pub unsafe fn async_send(async_handle: *uv_async_t) { - return rust_uv_async_send(async_handle); -} -pub unsafe fn buf_init(input: *u8, len: uint) -> uv_buf_t { - let out_buf = uv_buf_t { base: ptr::null(), len: 0 as size_t }; - let out_buf_ptr = ptr::to_unsafe_ptr(&out_buf); - rust_uv_buf_init(out_buf_ptr, input, len as size_t); - return out_buf; -} - -pub unsafe fn timer_init(loop_ptr: *c_void, timer_ptr: *uv_timer_t) -> c_int { - return rust_uv_timer_init(loop_ptr, timer_ptr); -} -pub unsafe fn timer_start(timer_ptr: *uv_timer_t, cb: *u8, timeout: uint, - repeat: uint) -> c_int { - return rust_uv_timer_start(timer_ptr, cb, timeout as c_uint, repeat as c_uint); -} -pub unsafe fn timer_stop(timer_ptr: *uv_timer_t) -> c_int { - return rust_uv_timer_stop(timer_ptr); -} - -pub unsafe fn malloc_ip4_addr(ip: &str, port: int) -> *sockaddr_in { - do str::as_c_str(ip) |ip_buf| { - rust_uv_ip4_addrp(ip_buf as *u8, port as libc::c_int) - } -} -pub unsafe fn malloc_ip6_addr(ip: &str, port: int) -> *sockaddr_in6 { - do str::as_c_str(ip) |ip_buf| { - rust_uv_ip6_addrp(ip_buf as *u8, port as libc::c_int) - } -} - -pub unsafe fn free_ip4_addr(addr: *sockaddr_in) { - rust_uv_free_ip4_addr(addr); -} - -pub unsafe fn free_ip6_addr(addr: *sockaddr_in6) { - rust_uv_free_ip6_addr(addr); -} - -// data access helpers -pub unsafe fn get_loop_for_uv_handle<T>(handle: *T) -> *c_void { - return rust_uv_get_loop_for_uv_handle(handle as *c_void); -} -pub unsafe fn get_stream_handle_from_connect_req(connect: *uv_connect_t) -> *uv_stream_t { - return rust_uv_get_stream_handle_from_connect_req(connect); -} -pub unsafe fn get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t { - return rust_uv_get_stream_handle_from_write_req(write_req); -} -pub unsafe fn get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void { - rust_uv_get_data_for_uv_loop(loop_ptr) -} -pub unsafe fn set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void) { - rust_uv_set_data_for_uv_loop(loop_ptr, data); -} -pub unsafe fn get_data_for_uv_handle<T>(handle: *T) -> *c_void { - return rust_uv_get_data_for_uv_handle(handle as *c_void); -} -pub unsafe fn set_data_for_uv_handle<T, U>(handle: *T, data: *U) { - rust_uv_set_data_for_uv_handle(handle as *c_void, data as *c_void); -} -pub unsafe fn get_data_for_req<T>(req: *T) -> *c_void { - return rust_uv_get_data_for_req(req as *c_void); -} -pub unsafe fn set_data_for_req<T, U>(req: *T, data: *U) { - rust_uv_set_data_for_req(req as *c_void, data as *c_void); -} -pub unsafe fn get_base_from_buf(buf: uv_buf_t) -> *u8 { - return rust_uv_get_base_from_buf(buf); -} -pub unsafe fn get_len_from_buf(buf: uv_buf_t) -> size_t { - return rust_uv_get_len_from_buf(buf); -} -pub unsafe fn malloc_buf_base_of(suggested_size: size_t) -> *u8 { - return rust_uv_malloc_buf_base_of(suggested_size); -} -pub unsafe fn free_base_of_buf(buf: uv_buf_t) { - rust_uv_free_base_of_buf(buf); -} - -pub unsafe fn get_last_err_info(uv_loop: *c_void) -> ~str { - let err = last_error(uv_loop); - let err_ptr = ptr::to_unsafe_ptr(&err); - let err_name = str::raw::from_c_str(err_name(err_ptr)); - let err_msg = str::raw::from_c_str(strerror(err_ptr)); - return fmt!("LIBUV ERROR: name: %s msg: %s", - err_name, err_msg); -} - -pub unsafe fn get_last_err_data(uv_loop: *c_void) -> uv_err_data { - let err = last_error(uv_loop); - let err_ptr = ptr::to_unsafe_ptr(&err); - let err_name = str::raw::from_c_str(err_name(err_ptr)); - let err_msg = str::raw::from_c_str(strerror(err_ptr)); - uv_err_data { err_name: err_name, err_msg: err_msg } -} - -pub struct uv_err_data { - err_name: ~str, - err_msg: ~str, -} - -extern { - - fn rust_uv_handle_size(type_: uintptr_t) -> size_t; - fn rust_uv_req_size(type_: uintptr_t) -> size_t; - fn rust_uv_handle_type_max() -> uintptr_t; - fn rust_uv_req_type_max() -> uintptr_t; - - // libuv public API - fn rust_uv_loop_new() -> *c_void; - fn rust_uv_loop_delete(lp: *c_void); - fn rust_uv_run(loop_handle: *c_void); - fn rust_uv_close(handle: *c_void, cb: *u8); - fn rust_uv_walk(loop_handle: *c_void, cb: *u8, arg: *c_void); - - fn rust_uv_idle_new() -> *uv_idle_t; - fn rust_uv_idle_delete(handle: *uv_idle_t); - fn rust_uv_idle_init(loop_handle: *uv_loop_t, handle: *uv_idle_t) -> c_int; - fn rust_uv_idle_start(handle: *uv_idle_t, cb: uv_idle_cb) -> c_int; - fn rust_uv_idle_stop(handle: *uv_idle_t) -> c_int; - - fn rust_uv_async_send(handle: *uv_async_t); - fn rust_uv_async_init(loop_handle: *c_void, - async_handle: *uv_async_t, - cb: *u8) -> c_int; - fn rust_uv_tcp_init(loop_handle: *c_void, handle_ptr: *uv_tcp_t) -> c_int; - // FIXME ref #2604 .. ? - fn rust_uv_buf_init(out_buf: *uv_buf_t, base: *u8, len: size_t); - fn rust_uv_last_error(loop_handle: *c_void) -> uv_err_t; - // FIXME ref #2064 - fn rust_uv_strerror(err: *uv_err_t) -> *c_char; - // FIXME ref #2064 - fn rust_uv_err_name(err: *uv_err_t) -> *c_char; - fn rust_uv_ip4_addrp(ip: *u8, port: c_int) -> *sockaddr_in; - fn rust_uv_ip6_addrp(ip: *u8, port: c_int) -> *sockaddr_in6; - fn rust_uv_free_ip4_addr(addr: *sockaddr_in); - fn rust_uv_free_ip6_addr(addr: *sockaddr_in6); - fn rust_uv_ip4_name(src: *sockaddr_in, dst: *u8, size: size_t) -> c_int; - fn rust_uv_ip6_name(src: *sockaddr_in6, dst: *u8, size: size_t) -> c_int; - fn rust_uv_ip4_port(src: *sockaddr_in) -> c_uint; - fn rust_uv_ip6_port(src: *sockaddr_in6) -> c_uint; - // FIXME ref #2064 - fn rust_uv_tcp_connect(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - after_cb: *u8, - addr: *sockaddr_in) -> c_int; - // FIXME ref #2064 - fn rust_uv_tcp_bind(tcp_server: *uv_tcp_t, addr: *sockaddr_in) -> c_int; - // FIXME ref #2064 - fn rust_uv_tcp_connect6(connect_ptr: *uv_connect_t, - tcp_handle_ptr: *uv_tcp_t, - after_cb: *u8, - addr: *sockaddr_in6) -> c_int; - // FIXME ref #2064 - fn rust_uv_tcp_bind6(tcp_server: *uv_tcp_t, addr: *sockaddr_in6) -> c_int; - fn rust_uv_tcp_getpeername(tcp_handle_ptr: *uv_tcp_t, - name: *sockaddr_in) -> c_int; - fn rust_uv_tcp_getpeername6(tcp_handle_ptr: *uv_tcp_t, - name: *sockaddr_in6) ->c_int; - fn rust_uv_listen(stream: *c_void, backlog: c_int, cb: *u8) -> c_int; - fn rust_uv_accept(server: *c_void, client: *c_void) -> c_int; - fn rust_uv_write(req: *c_void, - stream: *c_void, - buf_in: *uv_buf_t, - buf_cnt: c_int, - cb: *u8) -> c_int; - fn rust_uv_read_start(stream: *c_void, - on_alloc: *u8, - on_read: *u8) -> c_int; - fn rust_uv_read_stop(stream: *c_void) -> c_int; - fn rust_uv_timer_init(loop_handle: *c_void, - timer_handle: *uv_timer_t) -> c_int; - fn rust_uv_timer_start(timer_handle: *uv_timer_t, - cb: *u8, - timeout: c_uint, - repeat: c_uint) -> c_int; - fn rust_uv_timer_stop(handle: *uv_timer_t) -> c_int; - - fn rust_uv_malloc_buf_base_of(sug_size: size_t) -> *u8; - fn rust_uv_free_base_of_buf(buf: uv_buf_t); - fn rust_uv_get_stream_handle_from_connect_req(connect_req: *uv_connect_t) -> *uv_stream_t; - fn rust_uv_get_stream_handle_from_write_req(write_req: *uv_write_t) -> *uv_stream_t; - fn rust_uv_get_loop_for_uv_handle(handle: *c_void) -> *c_void; - fn rust_uv_get_data_for_uv_loop(loop_ptr: *c_void) -> *c_void; - fn rust_uv_set_data_for_uv_loop(loop_ptr: *c_void, data: *c_void); - fn rust_uv_get_data_for_uv_handle(handle: *c_void) -> *c_void; - fn rust_uv_set_data_for_uv_handle(handle: *c_void, data: *c_void); - fn rust_uv_get_data_for_req(req: *c_void) -> *c_void; - fn rust_uv_set_data_for_req(req: *c_void, data: *c_void); - fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8; - fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t; -} diff --git a/src/libstd/sys.rs b/src/libstd/sys.rs index 87e13e494aa..e49ad348542 100644 --- a/src/libstd/sys.rs +++ b/src/libstd/sys.rs @@ -213,11 +213,7 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! { gc::cleanup_stack_for_failure(); let task = Local::unsafe_borrow::<Task>(); - let unwinder: &mut Option<Unwinder> = &mut (*task).unwinder; - match *unwinder { - Some(ref mut unwinder) => unwinder.begin_unwind(), - None => abort!("failure without unwinder. aborting process") - } + (*task).unwinder.begin_unwind(); } } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 223afbce091..99858feab22 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -520,20 +520,9 @@ pub fn failing() -> bool { } } _ => { - let mut unwinding = false; - do Local::borrow::<Task> |local| { - unwinding = match local.unwinder { - Some(unwinder) => { - unwinder.unwinding - } - None => { - // Because there is no unwinder we can't be unwinding. - // (The process will abort on failure) - false - } - } + do Local::borrow::<Task, bool> |local| { + local.unwinder.unwinding } - return unwinding; } } } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 30ad4ee2a89..9df93d19d21 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -93,6 +93,7 @@ use util; use unstable::sync::{Exclusive, exclusive}; use rt::local::Local; use iterator::{IteratorUtil}; +use rt::task::Task; #[cfg(test)] use task::default_task_opts; #[cfg(test)] use comm; @@ -585,9 +586,14 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { use rt::sched::*; + let task = do Local::borrow::<Task, ~Task>() |running_task| { + ~running_task.new_child() + }; + let mut sched = Local::take::<Scheduler>(); - let task = ~Coroutine::new(&mut sched.stack_pool, f); - sched.schedule_new_task(task); + let task = ~Coroutine::with_task(&mut sched.stack_pool, + task, f); + sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index e75cf2c01c6..3d61c1fe144 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -245,7 +245,7 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { } _ => { let mut alloc = ::ptr::null(); - do Local::borrow::<Task> |task| { + do Local::borrow::<Task,()> |task| { alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; @@ -263,7 +263,7 @@ pub unsafe fn local_free(ptr: *c_char) { rustrt::rust_upcall_free_noswitch(ptr); } _ => { - do Local::borrow::<Task> |task| { + do Local::borrow::<Task,()> |task| { task.heap.free(ptr as *c_void); } } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index f0b178c6670..3cf46bec41a 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -205,8 +205,53 @@ extern { fn rust_unlock_little_lock(lock: rust_little_lock); } +/* *********************************************************************/ + +//FIXME: #5042 This should be replaced by proper atomic type +pub struct AtomicUint { + priv inner: uint +} + +impl AtomicUint { + pub fn new(val: uint) -> AtomicUint { AtomicUint { inner: val } } + pub fn load(&self) -> uint { + unsafe { intrinsics::atomic_load(cast::transmute(self)) as uint } + } + pub fn store(&mut self, val: uint) { + unsafe { intrinsics::atomic_store(cast::transmute(self), val as int); } + } + pub fn add(&mut self, val: int) -> uint { + unsafe { intrinsics::atomic_xadd(cast::transmute(self), val as int) as uint } + } + pub fn cas(&mut self, old:uint, new: uint) -> uint { + unsafe { intrinsics::atomic_cxchg(cast::transmute(self), old as int, new as int) as uint } + } +} + +pub struct AtomicInt { + priv inner: int +} + +impl AtomicInt { + pub fn new(val: int) -> AtomicInt { AtomicInt { inner: val } } + pub fn load(&self) -> int { + unsafe { intrinsics::atomic_load(&self.inner) } + } + pub fn store(&mut self, val: int) { + unsafe { intrinsics::atomic_store(&mut self.inner, val); } + } + pub fn add(&mut self, val: int) -> int { + unsafe { intrinsics::atomic_xadd(&mut self.inner, val) } + } + pub fn cas(&mut self, old: int, new: int) -> int { + unsafe { intrinsics::atomic_cxchg(&mut self.inner, old, new) } + } +} + + #[cfg(test)] mod tests { + use super::*; use comm; use super::exclusive; use task; @@ -262,4 +307,28 @@ mod tests { } } } + + #[test] + fn atomic_int_smoke_test() { + let mut i = AtomicInt::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } + + #[test] + fn atomic_uint_smoke_test() { + let mut i = AtomicUint::new(0); + i.store(10); + assert!(i.load() == 10); + assert!(i.add(1) == 10); + assert!(i.load() == 11); + assert!(i.cas(11, 12) == 11); + assert!(i.cas(11, 13) == 12); + assert!(i.load() == 12); + } } diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 5e7357c9b7b..fe4e75fb8d2 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -930,6 +930,13 @@ rust_begin_unwind(uintptr_t token) { #endif } +extern int get_num_cpus(); + +extern "C" CDECL uintptr_t +rust_get_num_cpus() { + return get_num_cpus(); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_env.cpp b/src/rt/rust_env.cpp index eaccb3b0a44..ff03ea817b8 100644 --- a/src/rt/rust_env.cpp +++ b/src/rt/rust_env.cpp @@ -40,7 +40,7 @@ rust_drop_env_lock() { } #if defined(__WIN32__) -static int +int get_num_cpus() { SYSTEM_INFO sysinfo; GetSystemInfo(&sysinfo); @@ -48,7 +48,7 @@ get_num_cpus() { return (int) sysinfo.dwNumberOfProcessors; } #elif defined(__BSD__) -static int +int get_num_cpus() { /* swiped from http://stackoverflow.com/questions/150355/ programmatically-find-the-number-of-cores-on-a-machine */ @@ -75,7 +75,7 @@ get_num_cpus() { return numCPU; } #elif defined(__GNUC__) -static int +int get_num_cpus() { return sysconf(_SC_NPROCESSORS_ONLN); } diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index e3e522aa7ce..9b49583519e 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -239,3 +239,4 @@ rust_valgrind_stack_deregister rust_take_env_lock rust_drop_env_lock rust_update_log_settings +rust_get_num_cpus \ No newline at end of file |
