From be7738bfa18989438e3597847cd6a7f3bbbfac12 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 1 Aug 2013 21:57:15 -0400 Subject: Add SendDeferred trait and use it to fix #8214. --- src/libstd/comm.rs | 30 ++++++++++ src/libstd/rt/comm.rs | 151 ++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 157 insertions(+), 24 deletions(-) (limited to 'src/libstd') diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index acdf2cee841..a0731dc3494 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -19,6 +19,7 @@ use either::{Either, Left, Right}; use kinds::Send; use option::{Option, Some}; use unstable::sync::Exclusive; +pub use rt::comm::SendDeferred; use rtcomm = rt::comm; use rt; @@ -105,6 +106,21 @@ impl GenericSmartChan for Chan { } } +impl SendDeferred for Chan { + fn send_deferred(&self, x: T) { + match self.inner { + Left(ref chan) => chan.send(x), + Right(ref chan) => chan.send_deferred(x) + } + } + fn try_send_deferred(&self, x: T) -> bool { + match self.inner { + Left(ref chan) => chan.try_send(x), + Right(ref chan) => chan.try_send_deferred(x) + } + } +} + impl GenericPort for Port { fn recv(&self) -> T { match self.inner { @@ -250,6 +266,20 @@ impl ChanOne { Right(p) => p.try_send(data) } } + pub fn send_deferred(self, data: T) { + let ChanOne { inner } = self; + match inner { + Left(p) => p.send(data), + Right(p) => p.send_deferred(data) + } + } + pub fn try_send_deferred(self, data: T) -> bool { + let ChanOne { inner } = self; + match inner { + Left(p) => p.try_send(data), + Right(p) => p.try_send_deferred(data) + } + } } pub fn recv_one(port: PortOne) -> T { diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa2193..c19ac8aa337 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl ChanOne { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -130,9 +151,16 @@ impl ChanOne { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow:: |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } @@ -152,6 +180,7 @@ impl PortOne { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +190,7 @@ impl PortOne { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option { let mut this = self; @@ -382,6 +412,12 @@ impl Drop for PortOne { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload { val: T, next: PortOne> @@ -409,6 +445,15 @@ pub fn stream() -> (Port, Chan) { return (port, chan); } +impl Chan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl GenericChan for Chan { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +462,16 @@ impl GenericChan for Chan { impl GenericSmartChan for Chan { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl SendDeferred for Chan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +546,17 @@ impl SharedChan { } } +impl SharedChan { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl GenericChan for SharedChan { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +565,16 @@ impl GenericChan for SharedChan { impl GenericSmartChan for SharedChan { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl SendDeferred for SharedChan { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +651,32 @@ pub fn megapipe() -> MegaPipe { impl GenericChan for MegaPipe { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl GenericSmartChan for MegaPipe { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl GenericPort for MegaPipe { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl SendDeferred for MegaPipe { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1085,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + } -- cgit 1.4.1-3-g733a5 From cde6ad39920ddadd7c70921232ae92adff258367 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Thu, 1 Aug 2013 21:57:58 -0400 Subject: Fix nasty double-free bug where a newrt chan could get killed after rescheduling but before suppressing_finalize. --- src/libstd/rt/comm.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/libstd') diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index c19ac8aa337..a060059f5fc 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -131,6 +131,13 @@ impl ChanOne { // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); + + // Suppress the synchronizing actions in the finalizer. We're + // done with the packet. NB: In case of do_resched, this *must* + // happen before waking up a blocked task (or be unkillable), + // because we might get a kill signal during the reschedule. + this.suppress_finalize = true; + match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do @@ -165,8 +172,6 @@ impl ChanOne { } } - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - this.suppress_finalize = true; return recvr_active; } } -- cgit 1.4.1-3-g733a5 From d30cca46e61f8e5e604a87f0e623cb852be6c85f Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 16:06:13 -0400 Subject: Fix embarrassing bug where 'unkillable' would unwind improperly when it receives a kill signal. --- src/libstd/rt/kill.rs | 5 +++-- src/libstd/task/mod.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) (limited to 'src/libstd') diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 696f4a8c355..6c450971cdc 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -548,11 +548,12 @@ impl Death { /// All calls must be paired with a subsequent call to allow_kill. #[inline] pub fn inhibit_kill(&mut self, already_failing: bool) { - if self.unkillable == 0 { + self.unkillable += 1; + // May fail, hence must happen *after* incrementing the counter + if self.unkillable == 1 { rtassert!(self.kill_handle.is_some()); self.kill_handle.get_mut_ref().inhibit_kill(already_failing); } - self.unkillable += 1; } /// Exit a possibly-nested unkillable section of code. diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 19acedb56dd..e08297a1425 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -655,6 +655,47 @@ pub unsafe fn rekillable(f: &fn() -> U) -> U { } } +#[test] #[ignore(cfg(windows))] +fn test_kill_unkillable_task() { + use rt::test::*; + + // Attempt to test that when a kill signal is received at the start of an + // unkillable section, 'unkillable' unwinds correctly. This is actually + // quite a difficult race to expose, as the kill has to happen on a second + // CPU, *after* the spawner is already switched-back-to (and passes the + // killed check at the start of its timeslice). As far as I know, it's not + // possible to make this race deterministic, or even more likely to happen. + do run_in_newsched_task { + do task::try { + do task::spawn { + fail!(); + } + do task::unkillable { } + }; + } +} + +#[test] #[ignore(cfg(windows))] +fn test_kill_rekillable_task() { + use rt::test::*; + + // Tests that when a kill signal is received, 'rekillable' and + // 'unkillable' unwind correctly in conjunction with each other. + do run_in_newsched_task { + do task::try { + do task::unkillable { + unsafe { + do task::rekillable { + do task::spawn { + fail!(); + } + } + } + } + }; + } +} + #[test] #[should_fail] #[ignore(cfg(windows))] fn test_cant_dup_task_builder() { let mut builder = task(); -- cgit 1.4.1-3-g733a5 From bd3579877326fb78ac860f92fc69f4f60bb93012 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 17:09:32 -0400 Subject: (cleanup) Use more do...finally in extra::sync. --- src/libextra/sync.rs | 309 +++++++++++++++++---------------------------------- src/libstd/util.rs | 6 + 2 files changed, 108 insertions(+), 207 deletions(-) (limited to 'src/libstd') diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 045aeb0feda..276f9cad7c6 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -22,7 +22,9 @@ use std::comm::SendDeferred; use std::task; use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox}; use std::unstable::atomics; +use std::unstable::finally::Finally; use std::util; +use std::util::NonCopyable; /**************************************************************************** * Internals @@ -84,7 +86,6 @@ struct SemInner { #[doc(hidden)] struct Sem(Exclusive>); - #[doc(hidden)] impl Sem { fn new(count: int, q: Q) -> Sem { @@ -125,17 +126,18 @@ impl Sem { } } } -} -// FIXME(#3154) move both copies of this into Sem, and unify the 2 structs -#[doc(hidden)] -impl Sem<()> { + pub fn access(&self, blk: &fn() -> U) -> U { - let mut release = None; do task::unkillable { - self.acquire(); - release = Some(SemRelease(self)); + do (|| { + self.acquire(); + unsafe { + do task::rekillable { blk() } + } + }).finally { + self.release(); + } } - blk() } } @@ -149,46 +151,6 @@ impl Sem<~[WaitQueue]> { } Sem::new(count, queues) } - - pub fn access_waitqueue(&self, blk: &fn() -> U) -> U { - let mut release = None; - do task::unkillable { - self.acquire(); - release = Some(SemAndSignalRelease(self)); - } - blk() - } -} - -// FIXME(#3588) should go inside of access() -#[doc(hidden)] -type SemRelease<'self> = SemReleaseGeneric<'self, ()>; -#[doc(hidden)] -type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>; -#[doc(hidden)] -struct SemReleaseGeneric<'self, Q> { sem: &'self Sem } - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> { - fn drop(&self) { - self.sem.release(); - } -} - -#[doc(hidden)] -fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> { - SemReleaseGeneric { - sem: sem - } -} - -#[doc(hidden)] -fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>) - -> SemAndSignalRelease<'r> { - SemReleaseGeneric { - sem: sem - } } // FIXME(#3598): Want to use an Option down below, but we need a custom enum @@ -211,11 +173,10 @@ pub struct Condvar<'self> { // writer waking up from a cvar wait can't race with a reader to steal it, // See the comment in write_cond for more detail. priv order: ReacquireOrderLock<'self>, + // Make sure condvars are non-copyable. + priv token: util::NonCopyable, } -#[unsafe_destructor] -impl<'self> Drop for Condvar<'self> { fn drop(&self) {} } - impl<'self> Condvar<'self> { /** * Atomically drop the associated lock, and block until a signal is sent. @@ -243,11 +204,10 @@ impl<'self> Condvar<'self> { let (WaitEnd, SignalEnd) = comm::oneshot(); let mut WaitEnd = Some(WaitEnd); let mut SignalEnd = Some(SignalEnd); - let mut reacquire = None; let mut out_of_bounds = None; - unsafe { - do task::unkillable { - // Release lock, 'atomically' enqueuing ourselves in so doing. + do task::unkillable { + // Release lock, 'atomically' enqueuing ourselves in so doing. + unsafe { do (**self.sem).with |state| { if condvar_id < state.blocked.len() { // Drop the lock. @@ -262,37 +222,25 @@ impl<'self> Condvar<'self> { out_of_bounds = Some(state.blocked.len()); } } - - // If yield checks start getting inserted anywhere, we can be - // killed before or after enqueueing. Deciding whether to - // unkillably reacquire the lock needs to happen atomically - // wrt enqueuing. - if out_of_bounds.is_none() { - reacquire = Some(CondvarReacquire { sem: self.sem, - order: self.order }); - } } - } - do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { - // Unconditionally "block". (Might not actually block if a - // signaller already sent -- I mean 'unconditionally' in contrast - // with acquire().) - let _ = comm::recv_one(WaitEnd.take_unwrap()); - } - // This is needed for a failing condition variable to reacquire the - // mutex during unwinding. As long as the wrapper (mutex, etc) is - // bounded in when it gets released, this shouldn't hang forever. - struct CondvarReacquire<'self> { - sem: &'self Sem<~[WaitQueue]>, - order: ReacquireOrderLock<'self>, - } - - #[unsafe_destructor] - impl<'self> Drop for CondvarReacquire<'self> { - fn drop(&self) { - // Needs to succeed, instead of itself dying. - do task::unkillable { + // If yield checks start getting inserted anywhere, we can be + // killed before or after enqueueing. Deciding whether to + // unkillably reacquire the lock needs to happen atomically + // wrt enqueuing. + do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") { + // Unconditionally "block". (Might not actually block if a + // signaller already sent -- I mean 'unconditionally' in contrast + // with acquire().) + do (|| { + unsafe { + do task::rekillable { + let _ = comm::recv_one(WaitEnd.take_unwrap()); + } + } + }).finally { + // Reacquire the condvar. Note this is back in the unkillable + // section; it needs to succeed, instead of itself dying. match self.order { Just(lock) => do lock.access { self.sem.acquire(); @@ -374,8 +322,8 @@ impl Sem<~[WaitQueue]> { // The only other places that condvars get built are rwlock.write_cond() // and rwlock_write_mode. pub fn access_cond(&self, blk: &fn(c: &Condvar) -> U) -> U { - do self.access_waitqueue { - blk(&Condvar { sem: self, order: Nothing }) + do self.access { + blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() }) } } } @@ -453,7 +401,7 @@ impl Mutex { /// Run a function with ownership of the mutex. pub fn lock(&self, blk: &fn() -> U) -> U { - (&self.sem).access_waitqueue(blk) + (&self.sem).access(blk) } /// Run a function with ownership of the mutex and a handle to a condvar. @@ -532,7 +480,6 @@ impl RWLock { * tasks may run concurrently with this one. */ pub fn read(&self, blk: &fn() -> U) -> U { - let mut release = None; unsafe { do task::unkillable { do (&self.order_lock).access { @@ -543,10 +490,24 @@ impl RWLock { state.read_mode = true; } } - release = Some(RWLockReleaseRead(self)); + do (|| { + do task::rekillable { blk() } + }).finally { + let state = &mut *self.state.get(); + assert!(state.read_mode); + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + if old_count == 1 { + state.read_mode = false; + // Note: this release used to be outside of a locked access + // to exclusive-protected state. If this code is ever + // converted back to such (instead of using atomic ops), + // this access MUST NOT go inside the exclusive access. + (&self.access_lock).release(); + } + } } } - blk() } /** @@ -557,7 +518,7 @@ impl RWLock { unsafe { do task::unkillable { (&self.order_lock).acquire(); - do (&self.access_lock).access_waitqueue { + do (&self.access_lock).access { (&self.order_lock).release(); do task::rekillable { blk() @@ -607,7 +568,8 @@ impl RWLock { (&self.order_lock).release(); do task::rekillable { let opt_lock = Just(&self.order_lock); - blk(&Condvar { order: opt_lock, ..*cond }) + blk(&Condvar { sem: cond.sem, order: opt_lock, + token: NonCopyable::new() }) } } } @@ -638,14 +600,43 @@ impl RWLock { pub fn write_downgrade(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U { // Implementation slightly different from the slicker 'write's above. // The exit path is conditional on whether the caller downgrades. - let mut _release = None; do task::unkillable { (&self.order_lock).acquire(); (&self.access_lock).acquire(); (&self.order_lock).release(); + do (|| { + unsafe { + do task::rekillable { + blk(RWLockWriteMode { lock: self, token: NonCopyable::new() }) + } + } + }).finally { + let writer_or_last_reader; + // Check if we're releasing from read mode or from write mode. + let state = unsafe { &mut *self.state.get() }; + if state.read_mode { + // Releasing from read mode. + let old_count = state.read_count.fetch_sub(1, atomics::Release); + assert!(old_count > 0); + // Check if other readers remain. + if old_count == 1 { + // Case 1: Writer downgraded & was the last reader + writer_or_last_reader = true; + state.read_mode = false; + } else { + // Case 2: Writer downgraded & was not the last reader + writer_or_last_reader = false; + } + } else { + // Case 3: Writer did not downgrade + writer_or_last_reader = true; + } + if writer_or_last_reader { + // Nobody left inside; release the "reader cloud" lock. + (&self.access_lock).release(); + } + } } - _release = Some(RWLockReleaseDowngrade(self)); - blk(RWLockWriteMode { lock: self }) } /// To be called inside of the write_downgrade block. @@ -674,105 +665,16 @@ impl RWLock { } } } - RWLockReadMode { lock: token.lock } - } -} - -// FIXME(#3588) should go inside of read() -#[doc(hidden)] -struct RWLockReleaseRead<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseRead<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let state = &mut *self.lock.state.get(); - assert!(state.read_mode); - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - if old_count == 1 { - state.read_mode = false; - // Note: this release used to be outside of a locked access - // to exclusive-protected state. If this code is ever - // converted back to such (instead of using atomic ops), - // this access MUST NOT go inside the exclusive access. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> { - RWLockReleaseRead { - lock: lock - } -} - -// FIXME(#3588) should go inside of downgrade() -#[doc(hidden)] -#[unsafe_destructor] -struct RWLockReleaseDowngrade<'self> { - lock: &'self RWLock, -} - -#[doc(hidden)] -#[unsafe_destructor] -impl<'self> Drop for RWLockReleaseDowngrade<'self> { - fn drop(&self) { - unsafe { - do task::unkillable { - let writer_or_last_reader; - // Check if we're releasing from read mode or from write mode. - let state = &mut *self.lock.state.get(); - if state.read_mode { - // Releasing from read mode. - let old_count = state.read_count.fetch_sub(1, atomics::Release); - assert!(old_count > 0); - // Check if other readers remain. - if old_count == 1 { - // Case 1: Writer downgraded & was the last reader - writer_or_last_reader = true; - state.read_mode = false; - } else { - // Case 2: Writer downgraded & was not the last reader - writer_or_last_reader = false; - } - } else { - // Case 3: Writer did not downgrade - writer_or_last_reader = true; - } - if writer_or_last_reader { - // Nobody left inside; release the "reader cloud" lock. - (&self.lock.access_lock).release(); - } - } - } - } -} - -#[doc(hidden)] -fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock) - -> RWLockReleaseDowngrade<'r> { - RWLockReleaseDowngrade { - lock: lock + RWLockReadMode { lock: token.lock, token: NonCopyable::new() } } } /// The "write permission" token used for rwlock.write_downgrade(). -pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} } +pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable } /// The "read permission" token used for rwlock.write_downgrade(). -pub struct RWLockReadMode<'self> { priv lock: &'self RWLock } -#[unsafe_destructor] -impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} } +pub struct RWLockReadMode<'self> { priv lock: &'self RWLock, + priv token: NonCopyable } impl<'self> RWLockWriteMode<'self> { /// Access the pre-downgrade rwlock in write mode. @@ -782,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> { // Need to make the condvar use the order lock when reacquiring the // access lock. See comment in RWLock::write_cond for why. blk(&Condvar { sem: &self.lock.access_lock, - order: Just(&self.lock.order_lock), }) + order: Just(&self.lock.order_lock), + token: NonCopyable::new() }) } } @@ -1060,6 +963,8 @@ mod tests { } #[test] #[ignore(cfg(windows))] fn test_mutex_killed_broadcast() { + use std::unstable::finally::Finally; + let m = ~Mutex::new(); let m2 = ~m.clone(); let (p,c) = comm::stream(); @@ -1076,8 +981,13 @@ mod tests { do mi.lock_cond |cond| { let c = c.take(); c.send(()); // tell sibling to go ahead - let _z = SendOnFailure(c); - cond.wait(); // block forever + do (|| { + cond.wait(); // block forever + }).finally { + error!("task unwinding and sending"); + c.send(()); + error!("task unwinding and done sending"); + } } } } @@ -1096,21 +1006,6 @@ mod tests { let woken = cond.broadcast(); assert_eq!(woken, 0); } - struct SendOnFailure { - c: comm::Chan<()>, - } - - impl Drop for SendOnFailure { - fn drop(&self) { - self.c.send(()); - } - } - - fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure { - SendOnFailure { - c: c - } - } } #[test] fn test_mutex_cond_signal_on_0() { diff --git a/src/libstd/util.rs b/src/libstd/util.rs index 8fcfa083cb6..b46876ad3fe 100644 --- a/src/libstd/util.rs +++ b/src/libstd/util.rs @@ -79,6 +79,12 @@ pub fn replace(dest: &mut T, mut src: T) -> T { #[unsafe_no_drop_flag] pub struct NonCopyable; +impl NonCopyable { + // FIXME(#8233) should not be necessary + /// Create a new noncopyable token. + pub fn new() -> NonCopyable { NonCopyable } +} + impl Drop for NonCopyable { fn drop(&self) { } } -- cgit 1.4.1-3-g733a5 From 92f60f4365beb7b0677b196b1650069bd88cb616 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 17:14:56 -0400 Subject: Don't fail from kill signals if already unwinding. --- src/libstd/rt/kill.rs | 4 ++-- src/libstd/rt/sched.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/libstd') diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 6c450971cdc..deec8dd37a6 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -530,13 +530,13 @@ impl Death { /// Fails if a kill signal was received. #[inline] - pub fn check_killed(&self) { + pub fn check_killed(&self, already_failing: bool) { match self.kill_handle { Some(ref kill_handle) => // The task may be both unkillable and killed if it does some // synchronization during unwinding or cleanup (for example, // sending on a notify port). In that case failing won't help. - if self.unkillable == 0 && kill_handle.killed() { + if self.unkillable == 0 && (!already_failing) && kill_handle.killed() { fail!(KILLED_MSG); }, // This may happen during task death (see comments in collect_failure). diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 5c9b142c052..36b98125229 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -600,7 +600,7 @@ impl Scheduler { // Must happen after running the cleanup job (of course). let task = Local::unsafe_borrow::(); - (*task).death.check_killed(); + (*task).death.check_killed((*task).unwinder.unwinding); } } -- cgit 1.4.1-3-g733a5 From 43fecf3556b47305320221586f48f89fe2f6c505 Mon Sep 17 00:00:00 2001 From: Ben Blum Date: Fri, 2 Aug 2013 17:23:00 -0400 Subject: Add an assert_may_sleep() check on every context switch. --- src/libstd/rt/sched.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/libstd') diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index 36b98125229..dfe003253c2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -540,6 +540,10 @@ impl Scheduler { // The current task is grabbed from TLS, not taken as an input. let current_task: ~Task = Local::take::(); + // Check that the task is not in an atomically() section (e.g., + // holding a pthread mutex, which could deadlock the scheduler). + current_task.death.assert_may_sleep(); + // These transmutes do something fishy with a closure. let f_fake_region = unsafe { transmute::<&fn(&mut Scheduler, ~Task), -- cgit 1.4.1-3-g733a5