about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-08-02 14:55:54 -0700
committerbors <bors@rust-lang.org>2013-08-02 14:55:54 -0700
commit3ddc72f69be4d0a2027ff598ad262ea2b2ca3812 (patch)
tree5942120bc4dba4d4b2da56457b425145030daf29 /src
parentf1c1f92d0c555d6e38ad1cac55926d6d9c9b090f (diff)
parent43fecf3556b47305320221586f48f89fe2f6c505 (diff)
downloadrust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.tar.gz
rust-3ddc72f69be4d0a2027ff598ad262ea2b2ca3812.zip
auto merge of #8234 : bblum/rust/assorted-fixes, r=brson
This fixes 4 bugs that prevented the extra::arc and extra::sync tests from passing on the new runtime.

* In ```Add SendDeferred trait``` I add a non-rescheduling ```send_deferred``` method to our various channel types. The ```extra::sync``` concurrency primitives need this guarantee so they can send while inside of an exclusive. (This fixes deterministic deadlocks seen with ```RUST_THREADS=1```.)
* In "Fix nasty double-free bug" I make sure that a ```ChanOne``` suppresses_finalize *before* rescheduling away to the receiver, so in case it gets a kill signal upon coming back, the destructor is inhibited as desired. (This is pretty uncommon on multiple CPUs but showed up always with ```RUST_THREADS=1```.)
* In ```Fix embarrassing bug where 'unkillable' would unwind improperly``` I make sure the task's unkillable counter stays consistent when a kill signal is received right at the start of an unkillable section. (This is a very uncommon race and can only occur with multiple CPUs.)
* In ```Don't fail from kill signals if already unwinding``` I do pretty much what it says on the tin. Surprising that it took the whole suite of sync/arc tests to expose this.

The other two commits are cleanup.

r @brson
Diffstat (limited to 'src')
-rw-r--r--src/libextra/sync.rs318
-rw-r--r--src/libstd/comm.rs30
-rw-r--r--src/libstd/rt/comm.rs160
-rw-r--r--src/libstd/rt/kill.rs9
-rw-r--r--src/libstd/rt/sched.rs6
-rw-r--r--src/libstd/task/mod.rs41
-rw-r--r--src/libstd/util.rs6
7 files changed, 328 insertions, 242 deletions
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs
index e539b067edd..276f9cad7c6 100644
--- a/src/libextra/sync.rs
+++ b/src/libextra/sync.rs
@@ -18,10 +18,13 @@
 
 use std::borrow;
 use std::comm;
+use std::comm::SendDeferred;
 use std::task;
 use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
 use std::unstable::atomics;
+use std::unstable::finally::Finally;
 use std::util;
+use std::util::NonCopyable;
 
 /****************************************************************************
  * Internals
@@ -49,7 +52,7 @@ impl WaitQueue {
         if self.head.peek() {
             // Pop and send a wakeup signal. If the waiter was killed, its port
             // will have closed. Keep trying until we get a live task.
-            if comm::try_send_one(self.head.recv(), ()) {
+            if self.head.recv().try_send_deferred(()) {
                 true
             } else {
                 self.signal()
@@ -62,7 +65,7 @@ impl WaitQueue {
     fn broadcast(&self) -> uint {
         let mut count = 0;
         while self.head.peek() {
-            if comm::try_send_one(self.head.recv(), ()) {
+            if self.head.recv().try_send_deferred(()) {
                 count += 1;
             }
         }
@@ -83,7 +86,6 @@ struct SemInner<Q> {
 #[doc(hidden)]
 struct Sem<Q>(Exclusive<SemInner<Q>>);
 
-
 #[doc(hidden)]
 impl<Q:Send> Sem<Q> {
     fn new(count: int, q: Q) -> Sem<Q> {
@@ -102,7 +104,7 @@ impl<Q:Send> Sem<Q> {
                     // Tell outer scope we need to block.
                     waiter_nobe = Some(WaitEnd);
                     // Enqueue ourself.
-                    state.waiters.tail.send(SignalEnd);
+                    state.waiters.tail.send_deferred(SignalEnd);
                 }
             }
             // Uncomment if you wish to test for sem races. Not valgrind-friendly.
@@ -124,17 +126,18 @@ impl<Q:Send> Sem<Q> {
             }
         }
     }
-}
-// FIXME(#3154) move both copies of this into Sem<Q>, and unify the 2 structs
-#[doc(hidden)]
-impl Sem<()> {
+
     pub fn access<U>(&self, blk: &fn() -> U) -> U {
-        let mut release = None;
         do task::unkillable {
-            self.acquire();
-            release = Some(SemRelease(self));
+            do (|| {
+                self.acquire();
+                unsafe {
+                    do task::rekillable { blk() }
+                }
+            }).finally {
+                self.release();
+            }
         }
-        blk()
     }
 }
 
@@ -148,46 +151,6 @@ impl Sem<~[WaitQueue]> {
         }
         Sem::new(count, queues)
     }
-
-    pub fn access_waitqueue<U>(&self, blk: &fn() -> U) -> U {
-        let mut release = None;
-        do task::unkillable {
-            self.acquire();
-            release = Some(SemAndSignalRelease(self));
-        }
-        blk()
-    }
-}
-
-// FIXME(#3588) should go inside of access()
-#[doc(hidden)]
-type SemRelease<'self> = SemReleaseGeneric<'self, ()>;
-#[doc(hidden)]
-type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[WaitQueue]>;
-#[doc(hidden)]
-struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> }
-
-#[doc(hidden)]
-#[unsafe_destructor]
-impl<'self, Q:Send> Drop for SemReleaseGeneric<'self, Q> {
-    fn drop(&self) {
-        self.sem.release();
-    }
-}
-
-#[doc(hidden)]
-fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> {
-    SemReleaseGeneric {
-        sem: sem
-    }
-}
-
-#[doc(hidden)]
-fn SemAndSignalRelease<'r>(sem: &'r Sem<~[WaitQueue]>)
-                        -> SemAndSignalRelease<'r> {
-    SemReleaseGeneric {
-        sem: sem
-    }
 }
 
 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
@@ -210,11 +173,10 @@ pub struct Condvar<'self> {
     // writer waking up from a cvar wait can't race with a reader to steal it,
     // See the comment in write_cond for more detail.
     priv order: ReacquireOrderLock<'self>,
+    // Make sure condvars are non-copyable.
+    priv token: util::NonCopyable,
 }
 
-#[unsafe_destructor]
-impl<'self> Drop for Condvar<'self> { fn drop(&self) {} }
-
 impl<'self> Condvar<'self> {
     /**
      * Atomically drop the associated lock, and block until a signal is sent.
@@ -242,11 +204,10 @@ impl<'self> Condvar<'self> {
         let (WaitEnd, SignalEnd) = comm::oneshot();
         let mut WaitEnd   = Some(WaitEnd);
         let mut SignalEnd = Some(SignalEnd);
-        let mut reacquire = None;
         let mut out_of_bounds = None;
-        unsafe {
-            do task::unkillable {
-                // Release lock, 'atomically' enqueuing ourselves in so doing.
+        do task::unkillable {
+            // Release lock, 'atomically' enqueuing ourselves in so doing.
+            unsafe {
                 do (**self.sem).with |state| {
                     if condvar_id < state.blocked.len() {
                         // Drop the lock.
@@ -256,42 +217,30 @@ impl<'self> Condvar<'self> {
                         }
                         // Enqueue ourself to be woken up by a signaller.
                         let SignalEnd = SignalEnd.take_unwrap();
-                        state.blocked[condvar_id].tail.send(SignalEnd);
+                        state.blocked[condvar_id].tail.send_deferred(SignalEnd);
                     } else {
                         out_of_bounds = Some(state.blocked.len());
                     }
                 }
-
-                // If yield checks start getting inserted anywhere, we can be
-                // killed before or after enqueueing. Deciding whether to
-                // unkillably reacquire the lock needs to happen atomically
-                // wrt enqueuing.
-                if out_of_bounds.is_none() {
-                    reacquire = Some(CondvarReacquire { sem:   self.sem,
-                                                        order: self.order });
-                }
             }
-        }
-        do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
-            // Unconditionally "block". (Might not actually block if a
-            // signaller already sent -- I mean 'unconditionally' in contrast
-            // with acquire().)
-            let _ = comm::recv_one(WaitEnd.take_unwrap());
-        }
 
-        // This is needed for a failing condition variable to reacquire the
-        // mutex during unwinding. As long as the wrapper (mutex, etc) is
-        // bounded in when it gets released, this shouldn't hang forever.
-        struct CondvarReacquire<'self> {
-            sem: &'self Sem<~[WaitQueue]>,
-            order: ReacquireOrderLock<'self>,
-        }
-
-        #[unsafe_destructor]
-        impl<'self> Drop for CondvarReacquire<'self> {
-            fn drop(&self) {
-                // Needs to succeed, instead of itself dying.
-                do task::unkillable {
+            // If yield checks start getting inserted anywhere, we can be
+            // killed before or after enqueueing. Deciding whether to
+            // unkillably reacquire the lock needs to happen atomically
+            // wrt enqueuing.
+            do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
+                // Unconditionally "block". (Might not actually block if a
+                // signaller already sent -- I mean 'unconditionally' in contrast
+                // with acquire().)
+                do (|| {
+                    unsafe {
+                        do task::rekillable {
+                            let _ = comm::recv_one(WaitEnd.take_unwrap());
+                        }
+                    }
+                }).finally {
+                    // Reacquire the condvar. Note this is back in the unkillable
+                    // section; it needs to succeed, instead of itself dying.
                     match self.order {
                         Just(lock) => do lock.access {
                             self.sem.acquire();
@@ -373,8 +322,8 @@ impl Sem<~[WaitQueue]> {
     // The only other places that condvars get built are rwlock.write_cond()
     // and rwlock_write_mode.
     pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
-        do self.access_waitqueue {
-            blk(&Condvar { sem: self, order: Nothing })
+        do self.access {
+            blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
         }
     }
 }
@@ -452,7 +401,7 @@ impl Mutex {
 
     /// Run a function with ownership of the mutex.
     pub fn lock<U>(&self, blk: &fn() -> U) -> U {
-        (&self.sem).access_waitqueue(blk)
+        (&self.sem).access(blk)
     }
 
     /// Run a function with ownership of the mutex and a handle to a condvar.
@@ -531,7 +480,6 @@ impl RWLock {
      * tasks may run concurrently with this one.
      */
     pub fn read<U>(&self, blk: &fn() -> U) -> U {
-        let mut release = None;
         unsafe {
             do task::unkillable {
                 do (&self.order_lock).access {
@@ -542,10 +490,24 @@ impl RWLock {
                         state.read_mode = true;
                     }
                 }
-                release = Some(RWLockReleaseRead(self));
+                do (|| {
+                    do task::rekillable { blk() }
+                }).finally {
+                    let state = &mut *self.state.get();
+                    assert!(state.read_mode);
+                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
+                    assert!(old_count > 0);
+                    if old_count == 1 {
+                        state.read_mode = false;
+                        // Note: this release used to be outside of a locked access
+                        // to exclusive-protected state. If this code is ever
+                        // converted back to such (instead of using atomic ops),
+                        // this access MUST NOT go inside the exclusive access.
+                        (&self.access_lock).release();
+                    }
+                }
             }
         }
-        blk()
     }
 
     /**
@@ -556,7 +518,7 @@ impl RWLock {
         unsafe {
             do task::unkillable {
                 (&self.order_lock).acquire();
-                do (&self.access_lock).access_waitqueue {
+                do (&self.access_lock).access {
                     (&self.order_lock).release();
                     do task::rekillable {
                         blk()
@@ -606,7 +568,8 @@ impl RWLock {
                     (&self.order_lock).release();
                     do task::rekillable {
                         let opt_lock = Just(&self.order_lock);
-                        blk(&Condvar { order: opt_lock, ..*cond })
+                        blk(&Condvar { sem: cond.sem, order: opt_lock,
+                                       token: NonCopyable::new() })
                     }
                 }
             }
@@ -637,14 +600,43 @@ impl RWLock {
     pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
         // Implementation slightly different from the slicker 'write's above.
         // The exit path is conditional on whether the caller downgrades.
-        let mut _release = None;
         do task::unkillable {
             (&self.order_lock).acquire();
             (&self.access_lock).acquire();
             (&self.order_lock).release();
+            do (|| {
+                unsafe {
+                    do task::rekillable {
+                        blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
+                    }
+                }
+            }).finally {
+                let writer_or_last_reader;
+                // Check if we're releasing from read mode or from write mode.
+                let state = unsafe { &mut *self.state.get() };
+                if state.read_mode {
+                    // Releasing from read mode.
+                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
+                    assert!(old_count > 0);
+                    // Check if other readers remain.
+                    if old_count == 1 {
+                        // Case 1: Writer downgraded & was the last reader
+                        writer_or_last_reader = true;
+                        state.read_mode = false;
+                    } else {
+                        // Case 2: Writer downgraded & was not the last reader
+                        writer_or_last_reader = false;
+                    }
+                } else {
+                    // Case 3: Writer did not downgrade
+                    writer_or_last_reader = true;
+                }
+                if writer_or_last_reader {
+                    // Nobody left inside; release the "reader cloud" lock.
+                    (&self.access_lock).release();
+                }
+            }
         }
-        _release = Some(RWLockReleaseDowngrade(self));
-        blk(RWLockWriteMode { lock: self })
     }
 
     /// To be called inside of the write_downgrade block.
@@ -673,105 +665,16 @@ impl RWLock {
                 }
             }
         }
-        RWLockReadMode { lock: token.lock }
-    }
-}
-
-// FIXME(#3588) should go inside of read()
-#[doc(hidden)]
-struct RWLockReleaseRead<'self> {
-    lock: &'self RWLock,
-}
-
-#[doc(hidden)]
-#[unsafe_destructor]
-impl<'self> Drop for RWLockReleaseRead<'self> {
-    fn drop(&self) {
-        unsafe {
-            do task::unkillable {
-                let state = &mut *self.lock.state.get();
-                assert!(state.read_mode);
-                let old_count = state.read_count.fetch_sub(1, atomics::Release);
-                assert!(old_count > 0);
-                if old_count == 1 {
-                    state.read_mode = false;
-                    // Note: this release used to be outside of a locked access
-                    // to exclusive-protected state. If this code is ever
-                    // converted back to such (instead of using atomic ops),
-                    // this access MUST NOT go inside the exclusive access.
-                    (&self.lock.access_lock).release();
-                }
-            }
-        }
-    }
-}
-
-#[doc(hidden)]
-fn RWLockReleaseRead<'r>(lock: &'r RWLock) -> RWLockReleaseRead<'r> {
-    RWLockReleaseRead {
-        lock: lock
-    }
-}
-
-// FIXME(#3588) should go inside of downgrade()
-#[doc(hidden)]
-#[unsafe_destructor]
-struct RWLockReleaseDowngrade<'self> {
-    lock: &'self RWLock,
-}
-
-#[doc(hidden)]
-#[unsafe_destructor]
-impl<'self> Drop for RWLockReleaseDowngrade<'self> {
-    fn drop(&self) {
-        unsafe {
-            do task::unkillable {
-                let writer_or_last_reader;
-                // Check if we're releasing from read mode or from write mode.
-                let state = &mut *self.lock.state.get();
-                if state.read_mode {
-                    // Releasing from read mode.
-                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
-                    assert!(old_count > 0);
-                    // Check if other readers remain.
-                    if old_count == 1 {
-                        // Case 1: Writer downgraded & was the last reader
-                        writer_or_last_reader = true;
-                        state.read_mode = false;
-                    } else {
-                        // Case 2: Writer downgraded & was not the last reader
-                        writer_or_last_reader = false;
-                    }
-                } else {
-                    // Case 3: Writer did not downgrade
-                    writer_or_last_reader = true;
-                }
-                if writer_or_last_reader {
-                    // Nobody left inside; release the "reader cloud" lock.
-                    (&self.lock.access_lock).release();
-                }
-            }
-        }
-    }
-}
-
-#[doc(hidden)]
-fn RWLockReleaseDowngrade<'r>(lock: &'r RWLock)
-                           -> RWLockReleaseDowngrade<'r> {
-    RWLockReleaseDowngrade {
-        lock: lock
+        RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
     }
 }
 
 /// The "write permission" token used for rwlock.write_downgrade().
-pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock }
-#[unsafe_destructor]
-impl<'self> Drop for RWLockWriteMode<'self> { fn drop(&self) {} }
+pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
 
 /// The "read permission" token used for rwlock.write_downgrade().
-pub struct RWLockReadMode<'self> { priv lock: &'self RWLock }
-#[unsafe_destructor]
-impl<'self> Drop for RWLockReadMode<'self> { fn drop(&self) {} }
+pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
+                                   priv token: NonCopyable }
 
 impl<'self> RWLockWriteMode<'self> {
     /// Access the pre-downgrade rwlock in write mode.
@@ -781,7 +684,8 @@ impl<'self> RWLockWriteMode<'self> {
         // Need to make the condvar use the order lock when reacquiring the
         // access lock. See comment in RWLock::write_cond for why.
         blk(&Condvar { sem:        &self.lock.access_lock,
-                       order: Just(&self.lock.order_lock), })
+                       order: Just(&self.lock.order_lock),
+                       token: NonCopyable::new() })
     }
 }
 
@@ -1059,6 +963,8 @@ mod tests {
     }
     #[test] #[ignore(cfg(windows))]
     fn test_mutex_killed_broadcast() {
+        use std::unstable::finally::Finally;
+
         let m = ~Mutex::new();
         let m2 = ~m.clone();
         let (p,c) = comm::stream();
@@ -1075,8 +981,13 @@ mod tests {
                     do mi.lock_cond |cond| {
                         let c = c.take();
                         c.send(()); // tell sibling to go ahead
-                        let _z = SendOnFailure(c);
-                        cond.wait(); // block forever
+                        do (|| {
+                            cond.wait(); // block forever
+                        }).finally {
+                            error!("task unwinding and sending");
+                            c.send(());
+                            error!("task unwinding and done sending");
+                        }
                     }
                 }
             }
@@ -1095,21 +1006,6 @@ mod tests {
             let woken = cond.broadcast();
             assert_eq!(woken, 0);
         }
-        struct SendOnFailure {
-            c: comm::Chan<()>,
-        }
-
-        impl Drop for SendOnFailure {
-            fn drop(&self) {
-                self.c.send(());
-            }
-        }
-
-        fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
-            SendOnFailure {
-                c: c
-            }
-        }
     }
     #[test]
     fn test_mutex_cond_signal_on_0() {
diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs
index acdf2cee841..a0731dc3494 100644
--- a/src/libstd/comm.rs
+++ b/src/libstd/comm.rs
@@ -19,6 +19,7 @@ use either::{Either, Left, Right};
 use kinds::Send;
 use option::{Option, Some};
 use unstable::sync::Exclusive;
+pub use rt::comm::SendDeferred;
 use rtcomm = rt::comm;
 use rt;
 
@@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> {
     }
 }
 
+impl<T: Send> SendDeferred<T> for Chan<T> {
+    fn send_deferred(&self, x: T) {
+        match self.inner {
+            Left(ref chan) => chan.send(x),
+            Right(ref chan) => chan.send_deferred(x)
+        }
+    }
+    fn try_send_deferred(&self, x: T) -> bool {
+        match self.inner {
+            Left(ref chan) => chan.try_send(x),
+            Right(ref chan) => chan.try_send_deferred(x)
+        }
+    }
+}
+
 impl<T: Send> GenericPort<T> for Port<T> {
     fn recv(&self) -> T {
         match self.inner {
@@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> {
             Right(p) => p.try_send(data)
         }
     }
+    pub fn send_deferred(self, data: T) {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.send(data),
+            Right(p) => p.send_deferred(data)
+        }
+    }
+    pub fn try_send_deferred(self, data: T) -> bool {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.try_send(data),
+            Right(p) => p.try_send_deferred(data)
+        }
+    }
 }
 
 pub fn recv_one<T: Send>(port: PortOne<T>) -> T {
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index 00e1aaa2193..a060059f5fc 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
 use cell::Cell;
 use clone::Clone;
 use rt::{context, SchedulerContext};
+use tuple::ImmutableTuple;
 
 /// A combined refcount / BlockedTask-as-uint pointer.
 ///
@@ -86,12 +87,32 @@ impl<T> ChanOne<T> {
         }
     }
 
+    /// Send a message on the one-shot channel. If a receiver task is blocked
+    /// waiting for the message, will wake it up and reschedule to it.
     pub fn send(self, val: T) {
         self.try_send(val);
     }
 
+    /// As `send`, but also returns whether or not the receiver endpoint is still open.
     pub fn try_send(self, val: T) -> bool {
+        self.try_send_inner(val, true)
+    }
+
+    /// Send a message without immediately rescheduling to a blocked receiver.
+    /// This can be useful in contexts where rescheduling is forbidden, or to
+    /// optimize for when the sender expects to still have useful work to do.
+    pub fn send_deferred(self, val: T) {
+        self.try_send_deferred(val);
+    }
+
+    /// As `send_deferred` and `try_send` together.
+    pub fn try_send_deferred(self, val: T) -> bool {
+        self.try_send_inner(val, false)
+    }
 
+    // 'do_resched' configures whether the scheduler immediately switches to
+    // the receiving task, or leaves the sending task still running.
+    fn try_send_inner(self, val: T, do_resched: bool) -> bool {
         rtassert!(context() != SchedulerContext);
 
         let mut this = self;
@@ -110,6 +131,13 @@ impl<T> ChanOne<T> {
             // acquire barrier that keeps the subsequent access of the
             // ~Task pointer from being reordered.
             let oldstate = (*packet).state.swap(STATE_ONE, SeqCst);
+
+            // Suppress the synchronizing actions in the finalizer. We're
+            // done with the packet. NB: In case of do_resched, this *must*
+            // happen before waking up a blocked task (or be unkillable),
+            // because we might get a kill signal during the reschedule.
+            this.suppress_finalize = true;
+
             match oldstate {
                 STATE_BOTH => {
                     // Port is not waiting yet. Nothing to do
@@ -130,15 +158,20 @@ impl<T> ChanOne<T> {
                 task_as_state => {
                     // Port is blocked. Wake it up.
                     let recvr = BlockedTask::cast_from_uint(task_as_state);
-                    do recvr.wake().map_consume |woken_task| {
-                        Scheduler::run_task(woken_task);
-                    };
+                    if do_resched {
+                        do recvr.wake().map_consume |woken_task| {
+                            Scheduler::run_task(woken_task);
+                        };
+                    } else {
+                        let recvr = Cell::new(recvr);
+                        do Local::borrow::<Scheduler, ()> |sched| {
+                            sched.enqueue_blocked_task(recvr.take());
+                        }
+                    }
                 }
             }
         }
 
-        // Suppress the synchronizing actions in the finalizer. We're done with the packet.
-        this.suppress_finalize = true;
         return recvr_active;
     }
 }
@@ -152,6 +185,7 @@ impl<T> PortOne<T> {
         }
     }
 
+    /// Wait for a message on the one-shot port. Fails if the send end is closed.
     pub fn recv(self) -> T {
         match self.try_recv() {
             Some(val) => val,
@@ -161,6 +195,7 @@ impl<T> PortOne<T> {
         }
     }
 
+    /// As `recv`, but returns `None` if the send end is closed rather than failing.
     pub fn try_recv(self) -> Option<T> {
         let mut this = self;
 
@@ -382,6 +417,12 @@ impl<T> Drop for PortOne<T> {
     }
 }
 
+/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne.
+pub trait SendDeferred<T> {
+    fn send_deferred(&self, val: T);
+    fn try_send_deferred(&self, val: T) -> bool;
+}
+
 struct StreamPayload<T> {
     val: T,
     next: PortOne<StreamPayload<T>>
@@ -409,6 +450,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) {
     return (port, chan);
 }
 
+impl<T: Send> Chan<T> {
+    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+        let (next_pone, next_cone) = oneshot();
+        let cone = self.next.take();
+        self.next.put_back(next_cone);
+        cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched)
+    }
+}
+
 impl<T: Send> GenericChan<T> for Chan<T> {
     fn send(&self, val: T) {
         self.try_send(val);
@@ -417,10 +467,16 @@ impl<T: Send> GenericChan<T> for Chan<T> {
 
 impl<T: Send> GenericSmartChan<T> for Chan<T> {
     fn try_send(&self, val: T) -> bool {
-        let (next_pone, next_cone) = oneshot();
-        let cone = self.next.take();
-        self.next.put_back(next_cone);
-        cone.try_send(StreamPayload { val: val, next: next_pone })
+        self.try_send_inner(val, true)
+    }
+}
+
+impl<T: Send> SendDeferred<T> for Chan<T> {
+    fn send_deferred(&self, val: T) {
+        self.try_send_deferred(val);
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.try_send_inner(val, false)
     }
 }
 
@@ -495,6 +551,17 @@ impl<T> SharedChan<T> {
     }
 }
 
+impl<T: Send> SharedChan<T> {
+    fn try_send_inner(&self, val: T, do_resched: bool) -> bool {
+        unsafe {
+            let (next_pone, next_cone) = oneshot();
+            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
+            cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone },
+                                         do_resched)
+        }
+    }
+}
+
 impl<T: Send> GenericChan<T> for SharedChan<T> {
     fn send(&self, val: T) {
         self.try_send(val);
@@ -503,11 +570,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> {
 
 impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
     fn try_send(&self, val: T) -> bool {
-        unsafe {
-            let (next_pone, next_cone) = oneshot();
-            let cone = (*self.next.get()).swap(~next_cone, SeqCst);
-            cone.unwrap().try_send(StreamPayload { val: val, next: next_pone })
-        }
+        self.try_send_inner(val, true)
+    }
+}
+
+impl<T: Send> SendDeferred<T> for SharedChan<T> {
+    fn send_deferred(&self, val: T) {
+        self.try_send_deferred(val);
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.try_send_inner(val, false)
     }
 }
 
@@ -584,31 +656,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> {
 
 impl<T: Send> GenericChan<T> for MegaPipe<T> {
     fn send(&self, val: T) {
-        match *self {
-            (_, ref c) => c.send(val)
-        }
+        self.second_ref().send(val)
     }
 }
 
 impl<T: Send> GenericSmartChan<T> for MegaPipe<T> {
     fn try_send(&self, val: T) -> bool {
-        match *self {
-            (_, ref c) => c.try_send(val)
-        }
+        self.second_ref().try_send(val)
     }
 }
 
 impl<T: Send> GenericPort<T> for MegaPipe<T> {
     fn recv(&self) -> T {
-        match *self {
-            (ref p, _) => p.recv()
-        }
+        self.first_ref().recv()
     }
 
     fn try_recv(&self) -> Option<T> {
-        match *self {
-            (ref p, _) => p.try_recv()
-        }
+        self.first_ref().try_recv()
+    }
+}
+
+impl<T: Send> SendDeferred<T> for MegaPipe<T> {
+    fn send_deferred(&self, val: T) {
+        self.second_ref().send_deferred(val)
+    }
+    fn try_send_deferred(&self, val: T) -> bool {
+        self.second_ref().try_send_deferred(val)
     }
 }
 
@@ -1017,4 +1090,39 @@ mod test {
             }
         }
     }
+
+    #[test]
+    fn send_deferred() {
+        use unstable::sync::atomically;
+
+        // Tests no-rescheduling of send_deferred on all types of channels.
+        do run_in_newsched_task {
+            let (pone, cone) = oneshot();
+            let (pstream, cstream) = stream();
+            let (pshared, cshared) = stream();
+            let cshared = SharedChan::new(cshared);
+            let mp = megapipe();
+
+            let pone = Cell::new(pone);
+            do spawntask { pone.take().recv(); }
+            let pstream = Cell::new(pstream);
+            do spawntask { pstream.take().recv(); }
+            let pshared = Cell::new(pshared);
+            do spawntask { pshared.take().recv(); }
+            let p_mp = Cell::new(mp.clone());
+            do spawntask { p_mp.take().recv(); }
+
+            let cs = Cell::new((cone, cstream, cshared, mp));
+            unsafe {
+                do atomically {
+                    let (cone, cstream, cshared, mp) = cs.take();
+                    cone.send_deferred(());
+                    cstream.send_deferred(());
+                    cshared.send_deferred(());
+                    mp.send_deferred(());
+                }
+            }
+        }
+    }
+
 }
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index 696f4a8c355..deec8dd37a6 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -530,13 +530,13 @@ impl Death {
 
     /// Fails if a kill signal was received.
     #[inline]
-    pub fn check_killed(&self) {
+    pub fn check_killed(&self, already_failing: bool) {
         match self.kill_handle {
             Some(ref kill_handle) =>
                 // The task may be both unkillable and killed if it does some
                 // synchronization during unwinding or cleanup (for example,
                 // sending on a notify port). In that case failing won't help.
-                if self.unkillable == 0 && kill_handle.killed() {
+                if self.unkillable == 0 && (!already_failing) && kill_handle.killed() {
                     fail!(KILLED_MSG);
                 },
             // This may happen during task death (see comments in collect_failure).
@@ -548,11 +548,12 @@ impl Death {
     /// All calls must be paired with a subsequent call to allow_kill.
     #[inline]
     pub fn inhibit_kill(&mut self, already_failing: bool) {
-        if self.unkillable == 0 {
+        self.unkillable += 1;
+        // May fail, hence must happen *after* incrementing the counter
+        if self.unkillable == 1 {
             rtassert!(self.kill_handle.is_some());
             self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
         }
-        self.unkillable += 1;
     }
 
     /// Exit a possibly-nested unkillable section of code.
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index 5c9b142c052..dfe003253c2 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -540,6 +540,10 @@ impl Scheduler {
         // The current task is grabbed from TLS, not taken as an input.
         let current_task: ~Task = Local::take::<Task>();
 
+        // Check that the task is not in an atomically() section (e.g.,
+        // holding a pthread mutex, which could deadlock the scheduler).
+        current_task.death.assert_may_sleep();
+
         // These transmutes do something fishy with a closure.
         let f_fake_region = unsafe {
             transmute::<&fn(&mut Scheduler, ~Task),
@@ -600,7 +604,7 @@ impl Scheduler {
 
             // Must happen after running the cleanup job (of course).
             let task = Local::unsafe_borrow::<Task>();
-            (*task).death.check_killed();
+            (*task).death.check_killed((*task).unwinder.unwinding);
         }
     }
 
diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs
index 19acedb56dd..e08297a1425 100644
--- a/src/libstd/task/mod.rs
+++ b/src/libstd/task/mod.rs
@@ -655,6 +655,47 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
     }
 }
 
+#[test] #[ignore(cfg(windows))]
+fn test_kill_unkillable_task() {
+    use rt::test::*;
+
+    // Attempt to test that when a kill signal is received at the start of an
+    // unkillable section, 'unkillable' unwinds correctly. This is actually
+    // quite a difficult race to expose, as the kill has to happen on a second
+    // CPU, *after* the spawner is already switched-back-to (and passes the
+    // killed check at the start of its timeslice). As far as I know, it's not
+    // possible to make this race deterministic, or even more likely to happen.
+    do run_in_newsched_task {
+        do task::try {
+            do task::spawn {
+                fail!();
+            }
+            do task::unkillable { }
+        };
+    }
+}
+
+#[test] #[ignore(cfg(windows))]
+fn test_kill_rekillable_task() {
+    use rt::test::*;
+
+    // Tests that when a kill signal is received, 'rekillable' and
+    // 'unkillable' unwind correctly in conjunction with each other.
+    do run_in_newsched_task {
+        do task::try {
+            do task::unkillable {
+                unsafe {
+                    do task::rekillable {
+                        do task::spawn {
+                            fail!();
+                        }
+                    }
+                }
+            }
+        };
+    }
+}
+
 #[test] #[should_fail] #[ignore(cfg(windows))]
 fn test_cant_dup_task_builder() {
     let mut builder = task();
diff --git a/src/libstd/util.rs b/src/libstd/util.rs
index 8fcfa083cb6..b46876ad3fe 100644
--- a/src/libstd/util.rs
+++ b/src/libstd/util.rs
@@ -79,6 +79,12 @@ pub fn replace<T>(dest: &mut T, mut src: T) -> T {
 #[unsafe_no_drop_flag]
 pub struct NonCopyable;
 
+impl NonCopyable {
+    // FIXME(#8233) should not be necessary
+    /// Create a new noncopyable token.
+    pub fn new() -> NonCopyable { NonCopyable }
+}
+
 impl Drop for NonCopyable {
     fn drop(&self) { }
 }