From be7738bfa18989438e3597847cd6a7f3bbbfac12 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 1 Aug 2013 21:57:15 -0400 Subject: Add SendDeferred trait and use it to fix #8214. --- src/libstd/rt/comm.rs | 151 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 127 insertions(+), 24 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa2193..c19ac8aa337 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl ChanOne { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -130,9 +151,16 @@ impl ChanOne { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow:: |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } @@ -152,6 +180,7 @@ impl PortOne { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +190,7 @@ impl PortOne { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option { let mut this = self; @@ -382,6 +412,12 @@ impl Drop for PortOne { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload { val: T, next: PortOne> @@ -409,6 +445,15 @@ pub fn stream() -> (Port, Chan) { return (port, chan); } +impl Chan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl GenericChan for Chan { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +462,16 @@ impl GenericChan for Chan { impl GenericSmartChan for Chan { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl SendDeferred for Chan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +546,17 @@ impl SharedChan { } } +impl SharedChan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl GenericChan for SharedChan { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +565,16 @@ impl GenericChan for SharedChan { impl GenericSmartChan for SharedChan { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl SendDeferred for SharedChan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +651,32 @@ pub fn megapipe() -> MegaPipe { impl GenericChan for MegaPipe { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl GenericSmartChan for MegaPipe { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl GenericPort for MegaPipe { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl SendDeferred for MegaPipe { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1085,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + } -- cgit 1.4.1-3-g733a5 From cde6ad39920ddadd7c70921232ae92adff258367 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 1 Aug 2013 21:57:58 -0400 Subject: Fix nasty double-free bug where a newrt chan could get killed after rescheduling but before suppressing_finalize. --- src/libstd/rt/comm.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index c19ac8aa337..a060059f5fc 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -131,6 +131,13 @@ impl ChanOne { // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); + + // Suppress the synchronizing actions in the finalizer. We're + // done with the packet. NB: In case of do_resched, this *must* + // happen before waking up a blocked task (or be unkillable), + // because we might get a kill signal during the reschedule. + this.suppress_finalize = true; + match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do @@ -165,8 +172,6 @@ impl ChanOne { } } - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.suppress_finalize = true; return recvr_active; } } -- cgit 1.4.1-3-g733a5 From d30cca46e61f8e5e604a87f0e623cb852be6c85f Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 16:06:13 -0400 Subject: Fix embarrassing bug where 'unkillable' would unwind improperly when it receives a kill signal. --- src/libstd/rt/kill.rs | 5 +++-- src/libstd/task/mod.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 696f4a8c355..6c450971cdc 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -548,11 +548,12 @@ impl Death { /// All calls must be paired with a subsequent call to allow_kill. #[inline] pub fn inhibit_kill(&mut self, already_failing: bool) { - if self.unkillable == 0 { + self.unkillable += 1; + // May fail, hence must happen *after* incrementing the counter + if self.unkillable == 1 { rtassert!(self.kill_handle.is_some()); self.kill_handle.get_mut_ref().inhibit_kill(already_failing); } - self.unkillable += 1; } /// Exit a possibly-nested unkillable section of code. diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 19acedb56dd..e08297a1425 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -655,6 +655,47 @@ pub unsafe fn rekillable(f: &fn() -> U) -> U { } } +#[test] #[ignore(cfg(windows))] +fn test_kill_unkillable_task() { + use rt::test::*; + + // Attempt to test that when a kill signal is received at the start of an + // unkillable section, 'unkillable' unwinds correctly. This is actually + // quite a difficult race to expose, as the kill has to happen on a second + // CPU, *after* the spawner is already switched-back-to (and passes the + // killed check at the start of its timeslice). As far as I know, it's not + // possible to make this race deterministic, or even more likely to happen. + do run_in_newsched_task { + do task::try { + do task::spawn { + fail!(); + } + do task::unkillable { } + }; + } +} + +#[test] #[ignore(cfg(windows))] +fn test_kill_rekillable_task() { + use rt::test::*; + + // Tests that when a kill signal is received, 'rekillable' and + // 'unkillable' unwind correctly in conjunction with each other. + do run_in_newsched_task { + do task::try { + do task::unkillable { + unsafe { + do task::rekillable { + do task::spawn { + fail!(); + } + } + } + } + }; + } +} + #[test] #[should_fail] #[ignore(cfg(windows))] fn test_cant_dup_task_builder() { let mut builder = task(); -- cgit 1.4.1-3-g733a5 From 92f60f4365beb7b0677b196b1650069bd88cb616 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 17:14:56 -0400 Subject: Don't fail from kill signals if already unwinding. --- src/libstd/rt/kill.rs | 4 ++-- src/libstd/rt/sched.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 6c450971cdc..deec8dd37a6 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -530,13 +530,13 @@ impl Death { /// Fails if a kill signal was received. #[inline] - pub fn check_killed(&self) { + pub fn check_killed(&self, already_failing: bool) { match self.kill_handle { Some(ref kill_handle) => // The task may be both unkillable and killed if it does some // synchronization during unwinding or cleanup (for example, // sending on a notify port). In that case failing won't help. - if self.unkillable == 0 && kill_handle.killed() { + if self.unkillable == 0 && (!already_failing) && kill_handle.killed() { fail!(KILLED_MSG); }, // This may happen during task death (see comments in collect_failure). diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 5c9b142c052..36b98125229 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -600,7 +600,7 @@ impl Scheduler { // Must happen after running the cleanup job (of course). let task = Local::unsafe_borrow::(); - (*task).death.check_killed(); + (*task).death.check_killed((*task).unwinder.unwinding); } } -- cgit 1.4.1-3-g733a5 From 43fecf3556b47305320221586f48f89fe2f6c505 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 17:23:00 -0400 Subject: Add an assert_may_sleep() check on every context switch. --- src/libstd/rt/sched.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/libstd/rt') diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 36b98125229..dfe003253c2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -540,6 +540,10 @@ impl Scheduler { // The current task is grabbed from TLS, not taken as an input. let current_task: ~Task = Local::take::(); + // Check that the task is not in an atomically() section (e.g., + // holding a pthread mutex, which could deadlock the scheduler). + current_task.death.assert_may_sleep(); + // These transmutes do something fishy with a closure. let f_fake_region = unsafe { transmute::<&fn(&mut Scheduler, ~Task), -- cgit 1.4.1-3-g733a5