diff options
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 160 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 9 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 6 |
3 files changed, 144 insertions, 31 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa2193..a060059f5fc 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl<T> ChanOne<T> { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -110,6 +131,13 @@ impl<T> ChanOne<T> { // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); + + // Suppress the synchronizing actions in the finalizer. We're + // done with the packet. NB: In case of do_resched, this *must* + // happen before waking up a blocked task (or be unkillable), + // because we might get a kill signal during the reschedule. + this.suppress_finalize = true; + match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do @@ -130,15 +158,20 @@ impl<T> ChanOne<T> { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow::<Scheduler, ()> |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.suppress_finalize = true; return recvr_active; } } @@ -152,6 +185,7 @@ impl<T> PortOne<T> { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +195,7 @@ impl<T> PortOne<T> { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option<T> { let mut this = self; @@ -382,6 +417,12 @@ impl<T> Drop for PortOne<T> { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred<T> { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload<T> { val: T, next: PortOne<StreamPayload<T>> @@ -409,6 +450,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { return (port, chan); } +impl<T: Send> Chan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl<T: Send> GenericChan<T> for Chan<T> { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +467,16 @@ impl<T: Send> GenericChan<T> for Chan<T> { impl<T: Send> GenericSmartChan<T> for Chan<T> { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for Chan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +551,17 @@ impl<T> SharedChan<T> { } } +impl<T: Send> SharedChan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl<T: Send> GenericChan<T> for SharedChan<T> { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +570,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> { impl<T: Send> GenericSmartChan<T> for SharedChan<T> { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for SharedChan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +656,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> { impl<T: Send> GenericChan<T> for MegaPipe<T> { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl<T: Send> GenericSmartChan<T> for MegaPipe<T> { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl<T: Send> GenericPort<T> for MegaPipe<T> { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option<T> { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl<T: Send> SendDeferred<T> for MegaPipe<T> { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1090,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + } diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 696f4a8c355..deec8dd37a6 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -530,13 +530,13 @@ impl Death { /// Fails if a kill signal was received. #[inline] - pub fn check_killed(&self) { + pub fn check_killed(&self, already_failing: bool) { match self.kill_handle { Some(ref kill_handle) => // The task may be both unkillable and killed if it does some // synchronization during unwinding or cleanup (for example, // sending on a notify port). In that case failing won't help. - if self.unkillable == 0 && kill_handle.killed() { + if self.unkillable == 0 && (!already_failing) && kill_handle.killed() { fail!(KILLED_MSG); }, // This may happen during task death (see comments in collect_failure). @@ -548,11 +548,12 @@ impl Death { /// All calls must be paired with a subsequent call to allow_kill. #[inline] pub fn inhibit_kill(&mut self, already_failing: bool) { - if self.unkillable == 0 { + self.unkillable += 1; + // May fail, hence must happen *after* incrementing the counter + if self.unkillable == 1 { rtassert!(self.kill_handle.is_some()); self.kill_handle.get_mut_ref().inhibit_kill(already_failing); } - self.unkillable += 1; } /// Exit a possibly-nested unkillable section of code. diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 5c9b142c052..dfe003253c2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -540,6 +540,10 @@ impl Scheduler { // The current task is grabbed from TLS, not taken as an input. let current_task: ~Task = Local::take::<Task>(); + // Check that the task is not in an atomically() section (e.g., + // holding a pthread mutex, which could deadlock the scheduler). + current_task.death.assert_may_sleep(); + // These transmutes do something fishy with a closure. let f_fake_region = unsafe { transmute::<&fn(&mut Scheduler, ~Task), @@ -600,7 +604,7 @@ impl Scheduler { // Must happen after running the cleanup job (of course). let task = Local::unsafe_borrow::<Task>(); - (*task).death.check_killed(); + (*task).death.check_killed((*task).unwinder.unwinding); } } |
