diff options
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 1141 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 37 | ||||
| -rw-r--r-- | src/libstd/rt/local_ptr.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/message_queue.rs | 55 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 21 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 105 | ||||
| -rw-r--r-- | src/libstd/rt/select.rs | 29 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 62 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 22 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 187 |
11 files changed, 220 insertions, 1446 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs deleted file mode 100644 index 2fa34994292..00000000000 --- a/src/libstd/rt/comm.rs +++ /dev/null @@ -1,1141 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Ports and channels. - -use option::*; -use cast; -use ops::Drop; -use rt::kill::BlockedTask; -use kinds::Send; -use rt; -use rt::sched::Scheduler; -use rt::local::Local; -use rt::select::{SelectInner, SelectPortInner}; -use select::{Select, SelectPort}; -use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst}; -use unstable::sync::UnsafeArc; -use util; -use util::Void; -use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable, SendDeferred}; -use cell::RefCell; -use clone::Clone; -use tuple::ImmutableTuple; - -/// A combined refcount / BlockedTask-as-uint pointer. -/// -/// Can be equal to the following values: -/// -/// * 2 - both endpoints are alive -/// * 1 - either the sender or the receiver is dead, determined by context -/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint) -type State = uint; - -static STATE_BOTH: State = 2; -static STATE_ONE: State = 1; - -/// The heap-allocated structure shared between two endpoints. -struct Packet<T> { - state: AtomicUint, - payload: Option<T>, -} - -// A one-shot channel. -pub struct ChanOne<T> { - priv void_packet: *mut Void, - priv suppress_finalize: bool -} - -/// A one-shot port. -pub struct PortOne<T> { - priv void_packet: *mut Void, - priv suppress_finalize: bool -} - -pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) { - let packet: ~Packet<T> = ~Packet { - state: AtomicUint::new(STATE_BOTH), - payload: None - }; - - unsafe { - let packet: *mut Void = cast::transmute(packet); - let port = PortOne { - void_packet: packet, - suppress_finalize: false - }; - let chan = ChanOne { - void_packet: packet, - suppress_finalize: false - }; - return (port, chan); - } -} - -impl<T: Send> ChanOne<T> { - #[inline] - fn packet(&self) -> *mut Packet<T> { - unsafe { - let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); - let p: *mut Packet<T> = &mut **p; - return p; - } - } - - /// Send a message on the one-shot channel. If a receiver task is blocked - /// waiting for the message, will wake it up and reschedule to it. - pub fn send(self, val: T) { - self.try_send(val); - } - - /// As `send`, but also returns whether or not the receiver endpoint is still open. - pub fn try_send(self, val: T) -> bool { - self.try_send_inner(val, true) - } - - /// Send a message without immediately rescheduling to a blocked receiver. - /// This can be useful in contexts where rescheduling is forbidden, or to - /// optimize for when the sender expects to still have useful work to do. - pub fn send_deferred(self, val: T) { - self.try_send_deferred(val); - } - - /// As `send_deferred` and `try_send` together. - pub fn try_send_deferred(self, val: T) -> bool { - self.try_send_inner(val, false) - } - - // 'do_resched' configures whether the scheduler immediately switches to - // the receiving task, or leaves the sending task still running. - fn try_send_inner(mut self, val: T, do_resched: bool) -> bool { - if do_resched { - rtassert!(!rt::in_sched_context()); - } - - // In order to prevent starvation of other tasks in situations - // where a task sends repeatedly without ever receiving, we - // occassionally yield instead of doing a send immediately. - // Only doing this if we're doing a rescheduling send, - // otherwise the caller is expecting not to context switch. - if do_resched { - // XXX: This TLS hit should be combined with other uses of the scheduler below - let sched: ~Scheduler = Local::take(); - sched.maybe_yield(); - } - - let mut recvr_active = true; - let packet = self.packet(); - - unsafe { - - // Install the payload - rtassert!((*packet).payload.is_none()); - (*packet).payload = Some(val); - - // Atomically swap out the old state to figure out what - // the port's up to, issuing a release barrier to prevent - // reordering of the payload write. This also issues an - // acquire barrier that keeps the subsequent access of the - // ~Task pointer from being reordered. - let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); - - // Suppress the synchronizing actions in the finalizer. We're - // done with the packet. NB: In case of do_resched, this *must* - // happen before waking up a blocked task (or be unkillable), - // because we might get a kill signal during the reschedule. - self.suppress_finalize = true; - - match oldstate { - STATE_BOTH => { - // Port is not waiting yet. Nothing to do - } - STATE_ONE => { - // Port has closed. Need to clean up. - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - recvr_active = false; - } - task_as_state => { - // Port is blocked. Wake it up. - let recvr = BlockedTask::cast_from_uint(task_as_state); - if do_resched { - recvr.wake().map(|woken_task| { - Scheduler::run_task(woken_task); - }); - } else { - let mut sched = Local::borrow(None::<Scheduler>); - sched.get().enqueue_blocked_task(recvr); - } - } - } - } - - return recvr_active; - } -} - -impl<T: Send> PortOne<T> { - fn packet(&self) -> *mut Packet<T> { - unsafe { - let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); - let p: *mut Packet<T> = &mut **p; - return p; - } - } - - /// Wait for a message on the one-shot port. Fails if the send end is closed. - pub fn recv(self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on closed channel"); - } - } - } - - /// As `recv`, but returns `None` if the send end is closed rather than failing. - pub fn try_recv(mut self) -> Option<T> { - // Optimistic check. If data was sent already, we don't even need to block. - // No release barrier needed here; we're not handing off our task pointer yet. - if !self.optimistic_check() { - // No data available yet. - // Switch to the scheduler to put the ~Task into the Packet state. - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - self.block_on(sched, task); - }) - } - - // Task resumes. - self.recv_ready() - } -} - -impl<T: Send> SelectInner for PortOne<T> { - #[inline] #[cfg(not(test))] - fn optimistic_check(&mut self) -> bool { - unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } - } - - #[inline] #[cfg(test)] - fn optimistic_check(&mut self) -> bool { - // The optimistic check is never necessary for correctness. For testing - // purposes, making it randomly return false simulates a racing sender. - use rand::{Rand}; - let mut sched = Local::borrow(None::<Scheduler>); - let actually_check = Rand::rand(&mut sched.get().rng); - if actually_check { - unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } - } else { - false - } - } - - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - unsafe { - // Atomically swap the task pointer into the Packet state, issuing - // an acquire barrier to prevent reordering of the subsequent read - // of the payload. Also issues a release barrier to prevent - // reordering of any previous writes to the task structure. - let task_as_state = task.cast_to_uint(); - let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - rtdebug!("non-rendezvous recv"); - false - } - STATE_ONE => { - // Re-record that we are the only owner of the packet. - // No barrier needed, even if the task gets reawoken - // on a different core -- this is analogous to writing a - // payload; a barrier in enqueueing the task protects it. - // NB(#8132). This *must* occur before the enqueue below. - // FIXME(#6842, #8130) This is usually only needed for the - // assertion in recv_ready, except in the case of select(). - // This won't actually ever have cacheline contention, but - // maybe should be optimized out with a cfg(test) anyway? - (*self.packet()).state.store(STATE_ONE, Relaxed); - - rtdebug!("rendezvous recv"); - - // Channel is closed. Switch back and check the data. - // NB: We have to drop back into the scheduler event loop here - // instead of switching immediately back or we could end up - // triggering infinite recursion on the scheduler's stack. - let recvr = BlockedTask::cast_from_uint(task_as_state); - sched.enqueue_blocked_task(recvr); - true - } - _ => rtabort!("can't block_on; a task is already blocked") - } - } - } - - // This is the only select trait function that's not also used in recv. - fn unblock_from(&mut self) -> bool { - let packet = self.packet(); - unsafe { - // In case the data is available, the acquire barrier here matches - // the release barrier the sender used to release the payload. - match (*packet).state.load(Acquire) { - // Impossible. We removed STATE_BOTH when blocking on it, and - // no self-respecting sender would put it back. - STATE_BOTH => rtabort!("refcount already 2 in unblock_from"), - // Here, a sender already tried to wake us up. Perhaps they - // even succeeded! Data is available. - STATE_ONE => true, - // Still registered as blocked. Need to "unblock" the pointer. - task_as_state => { - // In the window between the load and the CAS, a sender - // might take the pointer and set the refcount to ONE. If - // that happens, we shouldn't clobber that with BOTH! - // Acquire barrier again for the same reason as above. - match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH, - Acquire) { - STATE_BOTH => rtabort!("refcount became 2 in unblock_from"), - STATE_ONE => true, // Lost the race. Data available. - same_ptr => { - // We successfully unblocked our task pointer. - rtassert!(task_as_state == same_ptr); - let handle = BlockedTask::cast_from_uint(task_as_state); - // Because we are already awake, the handle we - // gave to this port shall already be empty. - handle.assert_already_awake(); - false - } - } - } - } - } - } -} - -impl<T: Send> Select for PortOne<T> { } - -impl<T: Send> SelectPortInner<T> for PortOne<T> { - fn recv_ready(mut self) -> Option<T> { - let packet = self.packet(); - - // No further memory barrier is needed here to access the - // payload. Some scenarios: - // - // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine. - // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us - // and ran on its thread. The sending task issued a read barrier when taking the - // pointer to the receiving task. - // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) - // is pinned to some other scheduler, so the sending task had to give us to - // a different scheduler for resuming. That send synchronized memory. - unsafe { - // See corresponding store() above in block_on for rationale. - // FIXME(#8130) This can happen only in test builds. - // This load is not required for correctness and may be compiled out. - rtassert!((*packet).state.load(Relaxed) == STATE_ONE); - - let payload = (*packet).payload.take(); - - // The sender has closed up shop. Drop the packet. - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - self.suppress_finalize = true; - return payload; - } - } -} - -impl<T: Send> SelectPort<T> for PortOne<T> { } - -impl<T: Send> Peekable<T> for PortOne<T> { - fn peek(&self) -> bool { - unsafe { - let packet: *mut Packet<T> = self.packet(); - let oldstate = (*packet).state.load(SeqCst); - match oldstate { - STATE_BOTH => false, - STATE_ONE => (*packet).payload.is_some(), - _ => rtabort!("peeked on a blocked task") - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for ChanOne<T> { - fn drop(&mut self) { - if self.suppress_finalize { return } - - unsafe { - let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst); - match oldstate { - STATE_BOTH => { - // Port still active. It will destroy the Packet. - }, - STATE_ONE => { - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - }, - task_as_state => { - // The port is blocked waiting for a message we will never send. Wake it. - rtassert!((*self.packet()).payload.is_none()); - let recvr = BlockedTask::cast_from_uint(task_as_state); - recvr.wake().map(|woken_task| { - Scheduler::run_task(woken_task); - }); - } - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for PortOne<T> { - fn drop(&mut self) { - if self.suppress_finalize { return } - - unsafe { - let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst); - match oldstate { - STATE_BOTH => { - // Chan still active. It will destroy the packet. - }, - STATE_ONE => { - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - } - task_as_state => { - // This case occurs during unwinding, when the blocked - // receiver was killed awake. The task can't still be - // blocked (we are it), but we need to free the handle. - let recvr = BlockedTask::cast_from_uint(task_as_state); - recvr.assert_already_awake(); - } - } - } - } -} - -struct StreamPayload<T> { - val: T, - next: PortOne<StreamPayload<T>> -} - -type StreamChanOne<T> = ChanOne<StreamPayload<T>>; -type StreamPortOne<T> = PortOne<StreamPayload<T>>; - -/// A channel with unbounded size. -pub struct Chan<T> { - // FIXME #5372. Using RefCell because we don't take &mut self - next: RefCell<StreamChanOne<T>> -} - -/// An port with unbounded size. -pub struct Port<T> { - // FIXME #5372. Using RefCell because we don't take &mut self - next: RefCell<Option<StreamPortOne<T>>> -} - -pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { - let (pone, cone) = oneshot(); - let port = Port { next: RefCell::new(Some(pone)) }; - let chan = Chan { next: RefCell::new(cone) }; - return (port, chan); -} - -impl<T: Send> Chan<T> { - fn try_send_inner(&self, val: T, do_resched: bool) -> bool { - let (next_pone, mut cone) = oneshot(); - let mut b = self.next.borrow_mut(); - util::swap(&mut cone, b.get()); - cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) - } -} - -impl<T: Send> GenericChan<T> for Chan<T> { - fn send(&self, val: T) { - self.try_send(val); - } -} - -impl<T: Send> GenericSmartChan<T> for Chan<T> { - fn try_send(&self, val: T) -> bool { - self.try_send_inner(val, true) - } -} - -impl<T: Send> SendDeferred<T> for Chan<T> { - fn send_deferred(&self, val: T) { - self.try_send_deferred(val); - } - fn try_send_deferred(&self, val: T) -> bool { - self.try_send_inner(val, false) - } -} - -impl<T: Send> GenericPort<T> for Port<T> { - fn recv(&self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on closed channel"); - } - } - } - - fn try_recv(&self) -> Option<T> { - let mut b = self.next.borrow_mut(); - b.get().take().map_default(None, |pone| { - match pone.try_recv() { - Some(StreamPayload { val, next }) => { - *b.get() = Some(next); - Some(val) - } - None => None - } - }) - } -} - -impl<T: Send> Peekable<T> for Port<T> { - fn peek(&self) -> bool { - self.next.with_mut(|p| p.get_mut_ref().peek()) - } -} - -// XXX: Kind of gross. A Port<T> should be selectable so you can make an array -// of them, but a &Port<T> should also be selectable so you can select2 on it -// alongside a PortOne<U> without passing the port by value in recv_ready. - -impl<'a, T: Send> SelectInner for &'a Port<T> { - #[inline] - fn optimistic_check(&mut self) -> bool { - self.next.with_mut(|pone| { pone.get_mut_ref().optimistic_check() }) - } - - #[inline] - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - let mut b = self.next.borrow_mut(); - b.get().get_mut_ref().block_on(sched, task) - } - - #[inline] - fn unblock_from(&mut self) -> bool { - self.next.with_mut(|pone| { pone.get_mut_ref().unblock_from() }) - } -} - -impl<'a, T: Send> Select for &'a Port<T> { } - -impl<T: Send> SelectInner for Port<T> { - #[inline] - fn optimistic_check(&mut self) -> bool { - (&*self).optimistic_check() - } - - #[inline] - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - (&*self).block_on(sched, task) - } - - #[inline] - fn unblock_from(&mut self) -> bool { - (&*self).unblock_from() - } -} - -impl<T: Send> Select for Port<T> { } - -impl<'a, T: Send> SelectPortInner<T> for &'a Port<T> { - fn recv_ready(self) -> Option<T> { - let mut b = self.next.borrow_mut(); - match b.get().take_unwrap().recv_ready() { - Some(StreamPayload { val, next }) => { - *b.get() = Some(next); - Some(val) - } - None => None - } - } -} - -impl<'a, T: Send> SelectPort<T> for &'a Port<T> { } - -pub struct SharedChan<T> { - // Just like Chan, but a shared AtomicOption - priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>> -} - -impl<T: Send> SharedChan<T> { - pub fn new(chan: Chan<T>) -> SharedChan<T> { - let next = chan.next.unwrap(); - let next = AtomicOption::new(~next); - SharedChan { next: UnsafeArc::new(next) } - } -} - -impl<T: Send> SharedChan<T> { - fn try_send_inner(&self, val: T, do_resched: bool) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, - do_resched) - } - } -} - -impl<T: Send> GenericChan<T> for SharedChan<T> { - fn send(&self, val: T) { - self.try_send(val); - } -} - -impl<T: Send> GenericSmartChan<T> for SharedChan<T> { - fn try_send(&self, val: T) -> bool { - self.try_send_inner(val, true) - } -} - -impl<T: Send> SendDeferred<T> for SharedChan<T> { - fn send_deferred(&self, val: T) { - self.try_send_deferred(val); - } - fn try_send_deferred(&self, val: T) -> bool { - self.try_send_inner(val, false) - } -} - -impl<T: Send> Clone for SharedChan<T> { - fn clone(&self) -> SharedChan<T> { - SharedChan { - next: self.next.clone() - } - } -} - -pub struct SharedPort<T> { - // The next port on which we will receive the next port on which we will receive T - priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>> -} - -impl<T: Send> SharedPort<T> { - pub fn new(port: Port<T>) -> SharedPort<T> { - // Put the data port into a new link pipe - let next_data_port = port.next.unwrap().unwrap(); - let (next_link_port, next_link_chan) = oneshot(); - next_link_chan.send(next_data_port); - let next_link = AtomicOption::new(~next_link_port); - SharedPort { next_link: UnsafeArc::new(next_link) } - } -} - -impl<T: Send> GenericPort<T> for SharedPort<T> { - fn recv(&self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on a closed channel"); - } - } - } - - fn try_recv(&self) -> Option<T> { - unsafe { - let (next_link_port, next_link_chan) = oneshot(); - let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); - let link_port = link_port.unwrap(); - let data_port = link_port.recv(); - let (next_data_port, res) = match data_port.try_recv() { - Some(StreamPayload { val, next }) => { - (next, Some(val)) - } - None => { - let (next_data_port, _) = oneshot(); - (next_data_port, None) - } - }; - next_link_chan.send(next_data_port); - return res; - } - } -} - -impl<T: Send> Clone for SharedPort<T> { - fn clone(&self) -> SharedPort<T> { - SharedPort { - next_link: self.next_link.clone() - } - } -} - -// FIXME #7760: Need better name -type MegaPipe<T> = (SharedPort<T>, SharedChan<T>); - -pub fn megapipe<T: Send>() -> MegaPipe<T> { - let (port, chan) = stream(); - (SharedPort::new(port), SharedChan::new(chan)) -} - -impl<T: Send> GenericChan<T> for MegaPipe<T> { - fn send(&self, val: T) { - self.second_ref().send(val) - } -} - -impl<T: Send> GenericSmartChan<T> for MegaPipe<T> { - fn try_send(&self, val: T) -> bool { - self.second_ref().try_send(val) - } -} - -impl<T: Send> GenericPort<T> for MegaPipe<T> { - fn recv(&self) -> T { - self.first_ref().recv() - } - - fn try_recv(&self) -> Option<T> { - self.first_ref().try_recv() - } -} - -impl<T: Send> SendDeferred<T> for MegaPipe<T> { - fn send_deferred(&self, val: T) { - self.second_ref().send_deferred(val) - } - fn try_send_deferred(&self, val: T) -> bool { - self.second_ref().try_send_deferred(val) - } -} - -#[cfg(test)] -mod test { - use super::*; - use option::*; - use rt::test::*; - use num::Times; - use rt::util; - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - do run_in_newsched_task { - let (port, _chan) = oneshot::<int>(); - { let _p = port; } - } - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - do run_in_newsched_task { - let (_port, chan) = oneshot::<int>(); - { let _c = chan; } - } - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - { let _p = port; } - chan.send(~0); - } - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will fail - do run_in_newsched_task { - let res = do spawntask_try { - let (port, chan) = oneshot::<~int>(); - { let _c = chan; } - port.recv(); - }; - // What is our res? - rtdebug!("res is: {:?}", res.is_err()); - assert!(res.is_err()); - } - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - chan.send(~10); - assert!(port.recv() == ~10); - } - } - - #[test] - fn oneshot_single_thread_try_send_open() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - assert!(chan.try_send(10)); - assert!(port.recv() == 10); - } - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _p = port; } - assert!(!chan.try_send(10)); - } - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - chan.send(10); - assert!(port.try_recv() == Some(10)); - } - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _c = chan; } - assert!(port.try_recv() == None); - } - } - - #[test] - fn oneshot_single_thread_peek_data() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - assert!(!port.peek()); - chan.send(10); - assert!(port.peek()); - } - } - - #[test] - fn oneshot_single_thread_peek_close() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _c = chan; } - assert!(!port.peek()); - assert!(!port.peek()); - } - } - - #[test] - fn oneshot_single_thread_peek_open() { - do run_in_newsched_task { - let (port, _) = oneshot::<int>(); - assert!(!port.peek()); - } - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - do spawntask { - assert!(port.recv() == ~10); - } - - chan.send(~10); - } - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - do spawntask_later { - let _ = chan; - } - let res = do spawntask_try { - assert!(port.recv() == ~10); - }; - assert!(res.is_err()); - } - } - - #[test] - fn oneshot_multi_thread_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread = do spawntask_thread { - let _ = port; - }; - let _chan = chan; - thread.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread1 = do spawntask_thread { - let _ = port; - }; - let thread2 = do spawntask_thread { - chan.send(1); - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread1 = do spawntask_thread { - let port = port; - let res = do spawntask_try { - port.recv(); - }; - assert!(res.is_err()); - }; - let thread2 = do spawntask_thread { - let chan = chan; - do spawntask { - let _ = chan; - } - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - let thread1 = do spawntask_thread { - chan.send(~10); - }; - let thread2 = do spawntask_thread { - assert!(port.recv() == ~10); - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn stream_send_recv_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_mt_newsched_task { - let (port, chan) = stream::<~int>(); - - send(chan, 0); - recv(port, 0); - - fn send(chan: Chan<~int>, i: int) { - if i == 10 { return } - - do spawntask_random { - chan.send(~i); - send(chan, i + 1); - } - } - - fn recv(port: Port<~int>, i: int) { - if i == 10 { return } - - do spawntask_random { - assert!(port.recv() == ~i); - recv(port, i + 1); - }; - } - } - }) - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - do run_in_newsched_task { - let (port, chan) = stream(); - 10000.times(|| { chan.send(()) }); - 10000.times(|| { port.recv() }); - } - } - - #[test] - fn shared_chan_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - do run_in_mt_newsched_task { - let (port, chan) = stream(); - let chan = SharedChan::new(chan); - let total = stress_factor() + 100; - total.times(|| { - let chan_clone = chan.clone(); - do spawntask_random { - chan_clone.send(()); - } - }); - - total.times(|| { - port.recv(); - }); - } - } - - #[test] - fn shared_port_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream(); - let (port, chan) = stream(); - let end_chan = SharedChan::new(end_chan); - let port = SharedPort::new(port); - let total = stress_factor() + 100; - total.times(|| { - let end_chan_clone = end_chan.clone(); - let port_clone = port.clone(); - do spawntask_random { - port_clone.recv(); - end_chan_clone.send(()); - } - }); - - total.times(|| { - chan.send(()); - }); - - total.times(|| { - end_port.recv(); - }); - } - } - - #[test] - fn shared_port_close_simple() { - do run_in_mt_newsched_task { - let (port, chan) = stream::<()>(); - let port = SharedPort::new(port); - { let _chan = chan; } - assert!(port.try_recv().is_none()); - } - } - - #[test] - fn shared_port_close() { - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream::<bool>(); - let (port, chan) = stream::<()>(); - let end_chan = SharedChan::new(end_chan); - let port = SharedPort::new(port); - let chan = SharedChan::new(chan); - let send_total = 10; - let recv_total = 20; - do spawntask_random { - send_total.times(|| { - let chan_clone = chan.clone(); - do spawntask_random { - chan_clone.send(()); - } - }); - } - let end_chan_clone = end_chan.clone(); - do spawntask_random { - recv_total.times(|| { - let port_clone = port.clone(); - let end_chan_clone = end_chan_clone.clone(); - do spawntask_random { - let recvd = port_clone.try_recv().is_some(); - end_chan_clone.send(recvd); - } - }); - } - - let mut recvd = 0; - recv_total.times(|| { - recvd += if end_port.recv() { 1 } else { 0 }; - }); - - assert!(recvd == send_total); - } - } - - #[test] - fn megapipe_stress() { - use rand; - use rand::Rng; - - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream::<()>(); - let end_chan = SharedChan::new(end_chan); - let pipe = megapipe(); - let total = stress_factor() + 10; - let mut rng = rand::rng(); - total.times(|| { - let msgs = rng.gen_range(0u, 10); - let pipe_clone = pipe.clone(); - let end_chan_clone = end_chan.clone(); - do spawntask_random { - msgs.times(|| { - pipe_clone.send(()); - }); - msgs.times(|| { - pipe_clone.recv(); - }); - } - - end_chan_clone.send(()); - }); - - total.times(|| { - end_port.recv(); - }); - } - } - - #[test] - fn send_deferred() { - use unstable::sync::atomic; - - // Tests no-rescheduling of send_deferred on all types of channels. - do run_in_newsched_task { - let (pone, cone) = oneshot(); - let (pstream, cstream) = stream(); - let (pshared, cshared) = stream(); - let cshared = SharedChan::new(cshared); - let mp = megapipe(); - - do spawntask { pone.recv(); } - do spawntask { pstream.recv(); } - do spawntask { pshared.recv(); } - let p_mp = mp.clone(); - do spawntask { p_mp.recv(); } - - unsafe { - let _guard = atomic(); - cone.send_deferred(()); - cstream.send_deferred(()); - cshared.send_deferred(()); - mp.send_deferred(()); - } - } - } - -} diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index e3f9cd09632..f4f128cf5aa 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -153,8 +153,9 @@ There are two known issues with the current scheme for exit code propagation. use cast; use option::{Option, Some, None}; use prelude::*; +use iter; +use task::TaskResult; use rt::task::Task; -use rt::task::UnwindResult; use unstable::atomics::{AtomicUint, SeqCst}; use unstable::sync::UnsafeArc; @@ -169,11 +170,21 @@ pub enum BlockedTask { pub struct Death { // Action to be done with the exit code. If set, also makes the task wait // until all its watched children exit before collecting the status. - on_exit: Option<proc(UnwindResult)>, + on_exit: Option<proc(TaskResult)>, // nesting level counter for unstable::atomically calls (0 == can deschedule). priv wont_sleep: int, } +pub struct BlockedTaskIterator { + priv inner: UnsafeArc<AtomicUint>, +} + +impl Iterator<BlockedTask> for BlockedTaskIterator { + fn next(&mut self) -> Option<BlockedTask> { + Some(Shared(self.inner.clone())) + } +} + impl BlockedTask { /// Returns Some if the task was successfully woken; None if already killed. pub fn wake(self) -> Option<~Task> { @@ -194,19 +205,17 @@ impl BlockedTask { } /// Converts one blocked task handle to a list of many handles to the same. - pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] { - let handles = match self { + pub fn make_selectable(self, num_handles: uint) + -> iter::Take<BlockedTaskIterator> + { + let arc = match self { Owned(task) => { - let flag = unsafe { - AtomicUint::new(cast::transmute(task)) - }; - UnsafeArc::newN(flag, num_handles) + let flag = unsafe { AtomicUint::new(cast::transmute(task)) }; + UnsafeArc::new(flag) } - Shared(arc) => arc.cloneN(num_handles), + Shared(arc) => arc.clone(), }; - // Even if the task was unkillable before, we use 'Killable' because - // multiple pipes will have handles. It does not really mean killable. - handles.move_iter().map(|x| Shared(x)).collect() + BlockedTaskIterator{ inner: arc }.take(num_handles) } // This assertion has two flavours because the wake involves an atomic op. @@ -254,10 +263,10 @@ impl Death { } /// Collect failure exit codes from children and propagate them to a parent. - pub fn collect_failure(&mut self, result: UnwindResult) { + pub fn collect_failure(&mut self, result: TaskResult) { match self.on_exit.take() { + Some(f) => f(result), None => {} - Some(on_exit) => on_exit(result), } } diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index 66fe9742121..925aa802ad5 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -77,10 +77,9 @@ pub unsafe fn borrow<T>() -> Borrowed<T> { /// it wherever possible. #[cfg(not(windows), not(target_os = "android"))] pub mod compiled { - #[cfg(not(test))] - use libc::c_void; use cast; use option::{Option, Some, None}; + #[cfg(not(test))] use libc::c_void; #[cfg(test)] pub use realstd::rt::shouldnt_be_public::RT_TLS_PTR; diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs deleted file mode 100644 index 10e457368f0..00000000000 --- a/src/libstd/rt/message_queue.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A concurrent queue that supports multiple producers and a -//! single consumer. - -use kinds::Send; -use vec::OwnedVector; -use option::Option; -use clone::Clone; -use rt::mpsc_queue::Queue; - -pub struct MessageQueue<T> { - priv queue: Queue<T> -} - -impl<T: Send> MessageQueue<T> { - pub fn new() -> MessageQueue<T> { - MessageQueue { - queue: Queue::new() - } - } - - #[inline] - pub fn push(&mut self, value: T) { - self.queue.push(value) - } - - #[inline] - pub fn pop(&mut self) -> Option<T> { - self.queue.pop() - } - - /// A pop that may sometimes miss enqueued elements, but is much faster - /// to give up without doing any synchronization - #[inline] - pub fn casual_pop(&mut self) -> Option<T> { - self.queue.pop() - } -} - -impl<T: Send> Clone for MessageQueue<T> { - fn clone(&self) -> MessageQueue<T> { - MessageQueue { - queue: self.queue.clone() - } - } -} diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index ce8d1ab1983..5d2179e8b96 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -65,7 +65,7 @@ use ptr::RawPtr; use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::UnwindResult; +use task::TaskResult; use rt::task::{Task, SchedTask, GreenTask, Sched}; use send_str::SendStrStatic; use unstable::atomics::{AtomicInt, AtomicBool, SeqCst}; @@ -91,8 +91,6 @@ pub use self::kill::BlockedTask; // XXX: these probably shouldn't be public... #[doc(hidden)] pub mod shouldnt_be_public { - pub use super::select::SelectInner; - pub use super::select::{SelectInner, SelectPortInner}; pub use super::local_ptr::native::maybe_tls_key; #[cfg(not(windows), not(target_os = "android"))] pub use super::local_ptr::compiled::RT_TLS_PTR; @@ -123,11 +121,11 @@ pub mod rtio; /// or task-local storage. pub mod local; -/// A parallel queue. -pub mod message_queue; - /// A mostly lock-free multi-producer, single consumer queue. -mod mpsc_queue; +pub mod mpsc_queue; + +/// A lock-free single-producer, single consumer queue. +pub mod spsc_queue; /// A lock-free multi-producer, multi-consumer bounded queue. mod mpmc_bounded_queue; @@ -169,11 +167,6 @@ pub mod rc; /// scheduler and task context pub mod tube; -/// Simple reimplementation of std::comm -pub mod comm; - -mod select; - /// The runtime needs to be able to put a pointer into thread-local storage. mod local_ptr; @@ -349,7 +342,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int { // When the main task exits, after all the tasks in the main // task tree, shut down the schedulers and set the exit code. let handles = handles; - let on_exit: proc(UnwindResult) = proc(exit_success) { + let on_exit: proc(TaskResult) = proc(exit_success) { unsafe { assert!(!(*exited_already.get()).swap(true, SeqCst), "the runtime already exited"); @@ -361,7 +354,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int { } unsafe { - let exit_code = if exit_success.is_success() { + let exit_code = if exit_success.is_ok() { use rt::util; // If we're exiting successfully, then return the global diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 557d9c998ca..b54231421e3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -10,7 +10,7 @@ use c_str::CString; use cast; -use comm::{SharedChan, PortOne, Port}; +use comm::{SharedChan, Port}; use libc::c_int; use libc; use ops::Drop; @@ -222,7 +222,7 @@ pub trait RtioUdpSocket : RtioSocket { pub trait RtioTimer { fn sleep(&mut self, msecs: u64); - fn oneshot(&mut self, msecs: u64) -> PortOne<()>; + fn oneshot(&mut self, msecs: u64) -> Port<()>; fn period(&mut self, msecs: u64) -> Port<()>; } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index fa17efc833b..ac3aeb5a4bb 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -17,7 +17,6 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::deque; use rt::local_ptr; @@ -29,6 +28,7 @@ use iter::range; use unstable::mutex::Mutex; use vec::{OwnedVector}; +use mpsc = super::mpsc_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,7 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: MessageQueue<SchedMessage>, + message_queue: mpsc::Consumer<SchedMessage, ()>, + /// Producer used to clone sched handles from + message_producer: mpsc::Producer<SchedMessage, ()>, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -104,7 +106,7 @@ enum EffortLevel { GiveItYourBest } -static MAX_YIELD_CHECKS: uint = 200; +static MAX_YIELD_CHECKS: uint = 20000; fn reset_yield_check(rng: &mut XorShiftRng) -> uint { let r: uint = Rand::rand(rng); @@ -135,9 +137,11 @@ impl Scheduler { friend: Option<SchedHandle>) -> Scheduler { + let (consumer, producer) = mpsc::queue(()); let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: MessageQueue::new(), + message_queue: consumer, + message_producer: producer, sleepy: false, no_sleep: false, event_loop: event_loop, @@ -218,7 +222,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(message.is_none()); + rtassert!(match message { mpsc::Empty => true, _ => false }); stask.destroyed = true; } @@ -315,10 +319,27 @@ impl Scheduler { fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> { let msg = if effort == DontTryTooHard { - // Do a cheap check that may miss messages self.message_queue.casual_pop() } else { - self.message_queue.pop() + // When popping our message queue, we could see an "inconsistent" + // state which means that we *should* be able to pop data, but we + // are unable to at this time. Our options are: + // + // 1. Spin waiting for data + // 2. Ignore this and pretend we didn't find a message + // + // If we choose route 1, then if the pusher in question is currently + // pre-empted, we're going to take up our entire time slice just + // spinning on this queue. If we choose route 2, then the pusher in + // question is still guaranteed to make a send() on its async + // handle, so we will guaranteed wake up and see its message at some + // point. + // + // I have chosen to take route #2. + match self.message_queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty | mpsc::Inconsistent => None + } }; match msg { @@ -793,7 +814,7 @@ impl Scheduler { return SchedHandle { remote: remote, - queue: self.message_queue.clone(), + queue: self.message_producer.clone(), sched_id: self.sched_id() }; } @@ -813,7 +834,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: MessageQueue<SchedMessage>, + priv queue: mpsc::Producer<SchedMessage, ()>, sched_id: uint } @@ -915,17 +936,17 @@ fn new_sched_rng() -> XorShiftRng { #[cfg(test)] mod test { use prelude::*; - use rt::test::*; - use unstable::run_in_bare_thread; + use borrow::to_uint; - use rt::sched::{Scheduler}; use rt::deque::BufferPool; - use rt::thread::Thread; - use rt::task::{Task, Sched}; use rt::basic; + use rt::sched::{Scheduler}; + use rt::task::{Task, Sched}; + use rt::test::*; + use rt::thread::Thread; use rt::util; - use option::{Some}; - use rt::task::UnwindResult; + use task::TaskResult; + use unstable::run_in_bare_thread; #[test] fn trivial_run_in_newsched_task_test() { @@ -1010,8 +1031,8 @@ mod test { assert!(Task::on_appropriate_sched()); }; - let on_exit: proc(UnwindResult) = proc(exit_status) { - rtassert!(exit_status.is_success()) + let on_exit: proc(TaskResult) = proc(exit_status) { + rtassert!(exit_status.is_ok()) }; task.death.on_exit = Some(on_exit); @@ -1027,7 +1048,6 @@ mod test { use rt::sleeper_list::SleeperList; use rt::sched::Shutdown; use borrow; - use rt::comm::*; do run_in_bare_thread { @@ -1089,7 +1109,7 @@ mod test { rtdebug!("task4 id: **{}**", borrow::to_uint(task4)); // Signal from the special task that we are done. - let (port, chan) = oneshot::<()>(); + let (port, chan) = Chan::<()>::new(); let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtdebug!("*about to submit task2*"); @@ -1160,10 +1180,8 @@ mod test { #[test] fn handle() { - use rt::comm::*; - do run_in_bare_thread { - let (port, chan) = oneshot::<()>(); + let (port, chan) = Chan::new(); let thread_one = do Thread::start { let chan = chan; @@ -1230,7 +1248,6 @@ mod test { #[test] fn multithreading() { - use rt::comm::*; use num::Times; use vec::OwnedVector; use container::Container; @@ -1238,7 +1255,7 @@ mod test { do run_in_mt_newsched_task { let mut ports = ~[]; 10.times(|| { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask_later { chan.send(()); } @@ -1253,21 +1270,17 @@ mod test { #[test] fn thread_ring() { - use rt::comm::*; - use comm::{GenericPort, GenericChan}; - do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = Chan::new(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); - let mut p = p; + let (mut p, ch1) = Chan::new(); ch1.send((token, end_chan)); let mut i = 2; while i <= n_tasks { - let (next_p, ch) = stream(); + let (next_p, ch) = Chan::new(); let imm_i = i; let imm_p = p; do spawntask_random { @@ -1276,23 +1289,23 @@ mod test { p = next_p; i += 1; } - let imm_p = p; - let imm_ch = ch1; + let p = p; do spawntask_random { - roundtrip(1, n_tasks, &imm_p, &imm_ch); + roundtrip(1, n_tasks, &p, &ch1); } end_port.recv(); } fn roundtrip(id: int, n_tasks: int, - p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { + p: &Port<(int, Chan<()>)>, + ch: &Chan<(int, Chan<()>)>) { while (true) { match p.recv() { (1, end_chan) => { - debug!("{}\n", id); - end_chan.send(()); - return; + debug!("{}\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: {} got token: {}", id, token); @@ -1331,16 +1344,14 @@ mod test { // FIXME: #9407: xfail-test fn dont_starve_1() { - use rt::comm::oneshot; - stress_factor().times(|| { do run_in_mt_newsched_task { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); // This task should not be able to starve the sender; // The sender should get stolen to another thread. do spawntask { - while !port.peek() { } + while port.try_recv().is_none() { } } chan.send(()); @@ -1350,17 +1361,15 @@ mod test { #[test] fn dont_starve_2() { - use rt::comm::oneshot; - stress_factor().times(|| { do run_in_newsched_task { - let (port, chan) = oneshot(); - let (_port2, chan2) = stream(); + let (port, chan) = Chan::new(); + let (_port2, chan2) = Chan::new(); // This task should not be able to starve the other task. // The sends should eventually yield. do spawntask { - while !port.peek() { + while port.try_recv().is_none() { chan2.send(()); } } diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs deleted file mode 100644 index 6cde0a1f216..00000000000 --- a/src/libstd/rt/select.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Module for private, abstraction-leaking select traits. Wrapped in std::select. - -use rt::kill::BlockedTask; -use rt::sched::Scheduler; -use option::Option; - -pub trait SelectInner { - // Returns true if data was available. - fn optimistic_check(&mut self) -> bool; - // Returns true if data was available. If so, shall also wake() the task. - fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool; - // Returns true if data was available. - fn unblock_from(&mut self) -> bool; -} - -pub trait SelectPortInner<T> { - fn recv_ready(self) -> Option<T>; -} - diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 2adc32f33fb..62e012f9f41 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -20,20 +20,22 @@ use prelude::*; use borrow; use cast::transmute; use cleanup; +use io::Writer; use libc::{c_void, uintptr_t, c_char, size_t}; use local_data; use option::{Option, Some, None}; use rt::borrowck::BorrowRecord; use rt::borrowck; +use rt::context; use rt::context::Context; use rt::env; -use io::Writer; use rt::kill::Death; use rt::local::Local; use rt::logging::StdErrLogger; use rt::sched::{Scheduler, SchedHandle}; use rt::stack::{StackSegment, StackPool}; use send_str::SendStr; +use task::TaskResult; use unstable::finally::Finally; use unstable::mutex::Mutex; @@ -90,46 +92,17 @@ pub enum SchedHome { pub struct GarbageCollector; pub struct LocalStorage(Option<local_data::Map>); -/// Represents the reason for the current unwinding process -pub enum UnwindResult { - /// The task is ending successfully - Success, - - /// The Task is failing with reason `~Any` - Failure(~Any), -} - -impl UnwindResult { - /// Returns `true` if this `UnwindResult` is a failure - #[inline] - pub fn is_failure(&self) -> bool { - match *self { - Success => false, - Failure(_) => true - } - } - - /// Returns `true` if this `UnwindResult` is a success - #[inline] - pub fn is_success(&self) -> bool { - match *self { - Success => true, - Failure(_) => false - } - } -} - pub struct Unwinder { unwinding: bool, cause: Option<~Any> } impl Unwinder { - fn to_unwind_result(&mut self) -> UnwindResult { + fn result(&mut self) -> TaskResult { if self.unwinding { - Failure(self.cause.take().unwrap()) + Err(self.cause.take().unwrap()) } else { - Success + Ok(()) } } } @@ -326,7 +299,7 @@ impl Task { // Cleanup the dynamic borrowck debugging info borrowck::clear_task_borrow_list(); - self.death.collect_failure(self.unwinder.to_unwind_result()); + self.death.collect_failure(self.unwinder.result()); self.destroyed = true; } @@ -691,6 +664,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! mod test { use super::*; use rt::test::*; + use prelude::*; #[test] fn local_heap() { @@ -744,22 +718,9 @@ mod test { } #[test] - fn comm_oneshot() { - use comm::*; - - do run_in_newsched_task { - let (port, chan) = oneshot(); - chan.send(10); - assert!(port.recv() == 10); - } - } - - #[test] fn comm_stream() { - use comm::*; - do run_in_newsched_task() { - let (port, chan) = stream(); + let (port, chan) = Chan::new(); chan.send(10); assert!(port.recv() == 10); } @@ -767,11 +728,8 @@ mod test { #[test] fn comm_shared_chan() { - use comm::*; - do run_in_newsched_task() { - let (port, chan) = stream(); - let chan = SharedChan::new(chan); + let (port, chan) = SharedChan::new(); chan.send(10); assert!(port.recv() == 10); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 96b80d11129..2b48b396c99 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -21,14 +21,14 @@ use rand::Rng; use rand; use result::{Result, Ok, Err}; use rt::basic; -use rt::comm::oneshot; use rt::deque::BufferPool; +use comm::Chan; use rt::new_event_loop; use rt::sched::Scheduler; use rt::sleeper_list::SleeperList; use rt::task::Task; -use rt::task::UnwindResult; use rt::thread::Thread; +use task::TaskResult; use unstable::{run_in_bare_thread}; use vec; use vec::{OwnedVector, MutableVector, ImmutableVector}; @@ -82,10 +82,10 @@ pub fn run_in_uv_task_core(f: proc()) { let mut sched = ~new_test_uv_sched(); let exit_handle = sched.make_handle(); - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { let mut exit_handle = exit_handle; exit_handle.send(Shutdown); - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); task.death.on_exit = Some(on_exit); @@ -99,10 +99,10 @@ pub fn run_in_newsched_task_core(f: proc()) { let mut sched = ~new_test_sched(); let exit_handle = sched.make_handle(); - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { let mut exit_handle = exit_handle; exit_handle.send(Shutdown); - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); task.death.on_exit = Some(on_exit); @@ -240,14 +240,14 @@ pub fn run_in_mt_newsched_task(f: proc()) { } let handles = handles; // Work around not being able to capture mut - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { // Tell schedulers to exit let mut handles = handles; for handle in handles.mut_iter() { handle.send(Shutdown); } - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, @@ -311,8 +311,8 @@ pub fn spawntask_random(f: proc()) { pub fn spawntask_try(f: proc()) -> Result<(),()> { - let (port, chan) = oneshot(); - let on_exit: proc(UnwindResult) = proc(exit_status) { + let (port, chan) = Chan::new(); + let on_exit: proc(TaskResult) = proc(exit_status) { chan.send(exit_status) }; @@ -322,7 +322,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> { Scheduler::run_task(new_task); let exit_status = port.recv(); - if exit_status.is_success() { Ok(()) } else { Err(()) } + if exit_status.is_ok() { Ok(()) } else { Err(()) } } diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index 9031147f8b1..da02988c75c 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -21,42 +21,32 @@ use kinds::Send; use libc; use ops::Drop; use option::{Option, Some, None}; -use ptr; use uint; -#[cfg(windows)] -use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, - LPVOID, DWORD, LPDWORD, HANDLE}; - -#[cfg(windows)] type rust_thread = HANDLE; -#[cfg(unix)] type rust_thread = libc::pthread_t; -#[cfg(windows)] type rust_thread_return = DWORD; -#[cfg(unix)] type rust_thread_return = *libc::c_void; - -type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return; +type StartFn = extern "C" fn(*libc::c_void) -> imp::rust_thread_return; /// This struct represents a native thread's state. This is used to join on an /// existing thread created in the join-able state. pub struct Thread<T> { - priv native: rust_thread, + priv native: imp::rust_thread, priv joined: bool, priv packet: ~Option<T>, } -static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024; +static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024; // This is the starting point of rust os threads. The first thing we do // is make sure that we don't trigger __morestack (also why this has a // no_split_stack annotation), and then we extract the main function // and invoke it. #[no_split_stack] -extern fn thread_start(main: *libc::c_void) -> rust_thread_return { +extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return { use rt::context; unsafe { context::record_stack_bounds(0, uint::max_value); let f: ~proc() = cast::transmute(main); (*f)(); - cast::transmute(0 as rust_thread_return) + cast::transmute(0 as imp::rust_thread_return) } } @@ -88,7 +78,7 @@ impl Thread<()> { *cast::transmute::<&~Option<T>, **mut Option<T>>(&packet) }; let main: proc() = proc() unsafe { *packet2 = Some(main()); }; - let native = unsafe { native_thread_create(~main) }; + let native = unsafe { imp::create(~main) }; Thread { native: native, @@ -105,10 +95,16 @@ impl Thread<()> { /// there are detached thread still running around. pub fn spawn(main: proc()) { unsafe { - let handle = native_thread_create(~main); - native_thread_detach(handle); + let handle = imp::create(~main); + imp::detach(handle); } } + + /// Relinquishes the CPU slot that this OS-thread is currently using, + /// allowing another thread to run for awhile. + pub fn yield_now() { + unsafe { imp::yield_now(); } + } } impl<T: Send> Thread<T> { @@ -116,7 +112,7 @@ impl<T: Send> Thread<T> { /// calculation. pub fn join(mut self) -> T { assert!(!self.joined); - unsafe { native_thread_join(self.native) }; + unsafe { imp::join(self.native) }; self.joined = true; assert!(self.packet.is_some()); self.packet.take_unwrap() @@ -129,80 +125,115 @@ impl<T: Send> Drop for Thread<T> { // This is required for correctness. If this is not done then the thread // would fill in a return box which no longer exists. if !self.joined { - unsafe { native_thread_join(self.native) }; + unsafe { imp::join(self.native) }; } } } #[cfg(windows)] -unsafe fn native_thread_create(p: ~proc()) -> rust_thread { - let arg: *mut libc::c_void = cast::transmute(p); - CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, - arg, 0, ptr::mut_null()) -} - -#[cfg(windows)] -unsafe fn native_thread_join(native: rust_thread) { - use libc::consts::os::extra::INFINITE; - WaitForSingleObject(native, INFINITE); -} +mod imp { + use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, + LPVOID, DWORD, LPDWORD, HANDLE}; + use libc; + use cast; + use super::DEFAULT_STACK_SIZE; + + pub type rust_thread = HANDLE; + pub type rust_thread_return = DWORD; + + pub unsafe fn create(p: ~proc()) -> rust_thread { + let arg: *mut libc::c_void = cast::transmute(p); + CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, super::thread_start, + arg, 0, ptr::mut_null()) + } -#[cfg(windows)] -unsafe fn native_thread_detach(native: rust_thread) { - assert!(libc::CloseHandle(native) != 0); -} + pub unsafe fn join(native: rust_thread) { + use libc::consts::os::extra::INFINITE; + WaitForSingleObject(native, INFINITE); + } -#[cfg(unix)] -unsafe fn native_thread_create(p: ~proc()) -> rust_thread { - use unstable::intrinsics; - use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + pub unsafe fn detach(native: rust_thread) { + assert!(libc::CloseHandle(native) != 0); + } - let mut native: libc::pthread_t = intrinsics::uninit(); - let mut attr: libc::pthread_attr_t = intrinsics::uninit(); - assert_eq!(pthread_attr_init(&mut attr), 0); - assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); - assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0); + pub unsafe fn yield_now() { + // This function will return 0 if there are no other threads to execute, + // but this also means that the yield was useless so this isn't really a + // case that needs to be worried about. + SwitchToThread(); + } - let arg: *libc::c_void = cast::transmute(p); - assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0); - native + extern "system" { + fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, + dwStackSize: SIZE_T, + lpStartAddress: super::StartFn, + lpParameter: LPVOID, + dwCreationFlags: DWORD, + lpThreadId: LPDWORD) -> HANDLE; + fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; + fn SwitchToThread() -> BOOL; + } } #[cfg(unix)] -unsafe fn native_thread_join(native: rust_thread) { - assert_eq!(pthread_join(native, ptr::null()), 0); -} +mod imp { + use cast; + use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + use libc; + use ptr; + use super::DEFAULT_STACK_SIZE; + use unstable::intrinsics; -#[cfg(unix)] -fn native_thread_detach(native: rust_thread) { - unsafe { assert_eq!(pthread_detach(native), 0) } -} + pub type rust_thread = libc::pthread_t; + pub type rust_thread_return = *libc::c_void; + + pub unsafe fn create(p: ~proc()) -> rust_thread { + let mut native: libc::pthread_t = intrinsics::uninit(); + let mut attr: libc::pthread_attr_t = intrinsics::uninit(); + assert_eq!(pthread_attr_init(&mut attr), 0); + assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); + assert_eq!(pthread_attr_setdetachstate(&mut attr, + PTHREAD_CREATE_JOINABLE), 0); + + let arg: *libc::c_void = cast::transmute(p); + assert_eq!(pthread_create(&mut native, &attr, + super::thread_start, arg), 0); + native + } -#[cfg(windows)] -extern "system" { - fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, - dwStackSize: SIZE_T, - lpStartAddress: StartFn, - lpParameter: LPVOID, - dwCreationFlags: DWORD, - lpThreadId: LPDWORD) -> HANDLE; - fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; -} + pub unsafe fn join(native: rust_thread) { + assert_eq!(pthread_join(native, ptr::null()), 0); + } -#[cfg(unix)] -extern { - fn pthread_create(native: *mut libc::pthread_t, - attr: *libc::pthread_attr_t, - f: StartFn, - value: *libc::c_void) -> libc::c_int; - fn pthread_join(native: libc::pthread_t, - value: **libc::c_void) -> libc::c_int; - fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; - fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, - stack_size: libc::size_t) -> libc::c_int; - fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, - state: libc::c_int) -> libc::c_int; - fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; + pub unsafe fn detach(native: rust_thread) { + assert_eq!(pthread_detach(native), 0); + } + + #[cfg(target_os = "macos")] + pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); } + + #[cfg(not(target_os = "macos"))] + pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); } + + extern { + fn pthread_create(native: *mut libc::pthread_t, + attr: *libc::pthread_attr_t, + f: super::StartFn, + value: *libc::c_void) -> libc::c_int; + fn pthread_join(native: libc::pthread_t, + value: **libc::c_void) -> libc::c_int; + fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; + fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, + stack_size: libc::size_t) -> libc::c_int; + fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, + state: libc::c_int) -> libc::c_int; + fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; + + #[cfg(target_os = "macos")] + fn sched_yield() -> libc::c_int; + #[cfg(not(target_os = "macos"))] + fn pthread_yield() -> libc::c_int; + } } #[cfg(test)] |
