about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2016-12-19 12:17:24 +0000
committerbors <bors@rust-lang.org>2016-12-19 12:17:24 +0000
commit10271ea24fbd7b28a42df8eb02a8dcf6d6132d71 (patch)
tree4e6317a08c4c8cf2b0cbef21dd235222ccce77c1 /src/libstd/sync
parente70415bd716cdbfaa7d7e849cb1d3b09254a7dcb (diff)
parent05be48b18b896c16b36cf3f68c14c87b79081f94 (diff)
downloadrust-10271ea24fbd7b28a42df8eb02a8dcf6d6132d71.tar.gz
rust-10271ea24fbd7b28a42df8eb02a8dcf6d6132d71.zip
Auto merge of #38466 - sanxiyn:rollup, r=sanxiyn
Rollup of 9 pull requests

- Successful merges: #38334, #38397, #38413, #38421, #38422, #38433, #38438, #38445, #38459
- Failed merges:
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/mpsc/mod.rs192
-rw-r--r--src/libstd/sync/mpsc/oneshot.rs293
-rw-r--r--src/libstd/sync/mpsc/shared.rs106
-rw-r--r--src/libstd/sync/mpsc/stream.rs65
4 files changed, 328 insertions, 328 deletions
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
index 63745388eb6..8bcf008649f 100644
--- a/src/libstd/sync/mpsc/mod.rs
+++ b/src/libstd/sync/mpsc/mod.rs
@@ -348,7 +348,7 @@ impl<T> !Sync for Sender<T> { }
 /// owned by one thread, but it can be cloned to send to other threads.
 #[stable(feature = "rust1", since = "1.0.0")]
 pub struct SyncSender<T> {
-    inner: Arc<UnsafeCell<sync::Packet<T>>>,
+    inner: Arc<sync::Packet<T>>,
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
@@ -426,10 +426,10 @@ pub enum TrySendError<T> {
 }
 
 enum Flavor<T> {
-    Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>),
-    Stream(Arc<UnsafeCell<stream::Packet<T>>>),
-    Shared(Arc<UnsafeCell<shared::Packet<T>>>),
-    Sync(Arc<UnsafeCell<sync::Packet<T>>>),
+    Oneshot(Arc<oneshot::Packet<T>>),
+    Stream(Arc<stream::Packet<T>>),
+    Shared(Arc<shared::Packet<T>>),
+    Sync(Arc<sync::Packet<T>>),
 }
 
 #[doc(hidden)]
@@ -487,7 +487,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
 /// ```
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
-    let a = Arc::new(UnsafeCell::new(oneshot::Packet::new()));
+    let a = Arc::new(oneshot::Packet::new());
     (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
 }
 
@@ -532,7 +532,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
 /// ```
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
-    let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound)));
+    let a = Arc::new(sync::Packet::new(bound));
     (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
 }
 
@@ -578,38 +578,30 @@ impl<T> Sender<T> {
     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
         let (new_inner, ret) = match *unsafe { self.inner() } {
             Flavor::Oneshot(ref p) => {
-                unsafe {
-                    let p = p.get();
-                    if !(*p).sent() {
-                        return (*p).send(t).map_err(SendError);
-                    } else {
-                        let a =
-                            Arc::new(UnsafeCell::new(stream::Packet::new()));
-                        let rx = Receiver::new(Flavor::Stream(a.clone()));
-                        match (*p).upgrade(rx) {
-                            oneshot::UpSuccess => {
-                                let ret = (*a.get()).send(t);
-                                (a, ret)
-                            }
-                            oneshot::UpDisconnected => (a, Err(t)),
-                            oneshot::UpWoke(token) => {
-                                // This send cannot panic because the thread is
-                                // asleep (we're looking at it), so the receiver
-                                // can't go away.
-                                (*a.get()).send(t).ok().unwrap();
-                                token.signal();
-                                (a, Ok(()))
-                            }
+                if !p.sent() {
+                    return p.send(t).map_err(SendError);
+                } else {
+                    let a = Arc::new(stream::Packet::new());
+                    let rx = Receiver::new(Flavor::Stream(a.clone()));
+                    match p.upgrade(rx) {
+                        oneshot::UpSuccess => {
+                            let ret = a.send(t);
+                            (a, ret)
+                        }
+                        oneshot::UpDisconnected => (a, Err(t)),
+                        oneshot::UpWoke(token) => {
+                            // This send cannot panic because the thread is
+                            // asleep (we're looking at it), so the receiver
+                            // can't go away.
+                            a.send(t).ok().unwrap();
+                            token.signal();
+                            (a, Ok(()))
                         }
                     }
                 }
             }
-            Flavor::Stream(ref p) => return unsafe {
-                (*p.get()).send(t).map_err(SendError)
-            },
-            Flavor::Shared(ref p) => return unsafe {
-                (*p.get()).send(t).map_err(SendError)
-            },
+            Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
+            Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
             Flavor::Sync(..) => unreachable!(),
         };
 
@@ -624,41 +616,43 @@ impl<T> Sender<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Clone for Sender<T> {
     fn clone(&self) -> Sender<T> {
-        let (packet, sleeper, guard) = match *unsafe { self.inner() } {
+        let packet = match *unsafe { self.inner() } {
             Flavor::Oneshot(ref p) => {
-                let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
-                unsafe {
-                    let guard = (*a.get()).postinit_lock();
+                let a = Arc::new(shared::Packet::new());
+                {
+                    let guard = a.postinit_lock();
                     let rx = Receiver::new(Flavor::Shared(a.clone()));
-                    match (*p.get()).upgrade(rx) {
+                    let sleeper = match p.upgrade(rx) {
                         oneshot::UpSuccess |
-                        oneshot::UpDisconnected => (a, None, guard),
-                        oneshot::UpWoke(task) => (a, Some(task), guard)
-                    }
+                        oneshot::UpDisconnected => None,
+                        oneshot::UpWoke(task) => Some(task),
+                    };
+                    a.inherit_blocker(sleeper, guard);
                 }
+                a
             }
             Flavor::Stream(ref p) => {
-                let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
-                unsafe {
-                    let guard = (*a.get()).postinit_lock();
+                let a = Arc::new(shared::Packet::new());
+                {
+                    let guard = a.postinit_lock();
                     let rx = Receiver::new(Flavor::Shared(a.clone()));
-                    match (*p.get()).upgrade(rx) {
+                    let sleeper = match p.upgrade(rx) {
                         stream::UpSuccess |
-                        stream::UpDisconnected => (a, None, guard),
-                        stream::UpWoke(task) => (a, Some(task), guard),
-                    }
+                        stream::UpDisconnected => None,
+                        stream::UpWoke(task) => Some(task),
+                    };
+                    a.inherit_blocker(sleeper, guard);
                 }
+                a
             }
             Flavor::Shared(ref p) => {
-                unsafe { (*p.get()).clone_chan(); }
+                p.clone_chan();
                 return Sender::new(Flavor::Shared(p.clone()));
             }
             Flavor::Sync(..) => unreachable!(),
         };
 
         unsafe {
-            (*packet.get()).inherit_blocker(sleeper, guard);
-
             let tmp = Sender::new(Flavor::Shared(packet.clone()));
             mem::swap(self.inner_mut(), tmp.inner_mut());
         }
@@ -669,10 +663,10 @@ impl<T> Clone for Sender<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for Sender<T> {
     fn drop(&mut self) {
-        match *unsafe { self.inner_mut() } {
-            Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
-            Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
-            Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
+        match *unsafe { self.inner() } {
+            Flavor::Oneshot(ref p) => p.drop_chan(),
+            Flavor::Stream(ref p) => p.drop_chan(),
+            Flavor::Shared(ref p) => p.drop_chan(),
             Flavor::Sync(..) => unreachable!(),
         }
     }
@@ -690,7 +684,7 @@ impl<T> fmt::Debug for Sender<T> {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T> SyncSender<T> {
-    fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> {
+    fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
         SyncSender { inner: inner }
     }
 
@@ -710,7 +704,7 @@ impl<T> SyncSender<T> {
     /// information.
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
-        unsafe { (*self.inner.get()).send(t).map_err(SendError) }
+        self.inner.send(t).map_err(SendError)
     }
 
     /// Attempts to send a value on this channel without blocking.
@@ -724,14 +718,14 @@ impl<T> SyncSender<T> {
     /// receiver has received the data or not if this function is successful.
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
-        unsafe { (*self.inner.get()).try_send(t) }
+        self.inner.try_send(t)
     }
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Clone for SyncSender<T> {
     fn clone(&self) -> SyncSender<T> {
-        unsafe { (*self.inner.get()).clone_chan(); }
+        self.inner.clone_chan();
         SyncSender::new(self.inner.clone())
     }
 }
@@ -739,7 +733,7 @@ impl<T> Clone for SyncSender<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for SyncSender<T> {
     fn drop(&mut self) {
-        unsafe { (*self.inner.get()).drop_chan(); }
+        self.inner.drop_chan();
     }
 }
 
@@ -772,7 +766,7 @@ impl<T> Receiver<T> {
         loop {
             let new_port = match *unsafe { self.inner() } {
                 Flavor::Oneshot(ref p) => {
-                    match unsafe { (*p.get()).try_recv() } {
+                    match p.try_recv() {
                         Ok(t) => return Ok(t),
                         Err(oneshot::Empty) => return Err(TryRecvError::Empty),
                         Err(oneshot::Disconnected) => {
@@ -782,7 +776,7 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Stream(ref p) => {
-                    match unsafe { (*p.get()).try_recv() } {
+                    match p.try_recv() {
                         Ok(t) => return Ok(t),
                         Err(stream::Empty) => return Err(TryRecvError::Empty),
                         Err(stream::Disconnected) => {
@@ -792,7 +786,7 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Shared(ref p) => {
-                    match unsafe { (*p.get()).try_recv() } {
+                    match p.try_recv() {
                         Ok(t) => return Ok(t),
                         Err(shared::Empty) => return Err(TryRecvError::Empty),
                         Err(shared::Disconnected) => {
@@ -801,7 +795,7 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Sync(ref p) => {
-                    match unsafe { (*p.get()).try_recv() } {
+                    match p.try_recv() {
                         Ok(t) => return Ok(t),
                         Err(sync::Empty) => return Err(TryRecvError::Empty),
                         Err(sync::Disconnected) => {
@@ -875,7 +869,7 @@ impl<T> Receiver<T> {
         loop {
             let new_port = match *unsafe { self.inner() } {
                 Flavor::Oneshot(ref p) => {
-                    match unsafe { (*p.get()).recv(None) } {
+                    match p.recv(None) {
                         Ok(t) => return Ok(t),
                         Err(oneshot::Disconnected) => return Err(RecvError),
                         Err(oneshot::Upgraded(rx)) => rx,
@@ -883,7 +877,7 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Stream(ref p) => {
-                    match unsafe { (*p.get()).recv(None) } {
+                    match p.recv(None) {
                         Ok(t) => return Ok(t),
                         Err(stream::Disconnected) => return Err(RecvError),
                         Err(stream::Upgraded(rx)) => rx,
@@ -891,15 +885,13 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Shared(ref p) => {
-                    match unsafe { (*p.get()).recv(None) } {
+                    match p.recv(None) {
                         Ok(t) => return Ok(t),
                         Err(shared::Disconnected) => return Err(RecvError),
                         Err(shared::Empty) => unreachable!(),
                     }
                 }
-                Flavor::Sync(ref p) => return unsafe {
-                    (*p.get()).recv(None).map_err(|_| RecvError)
-                }
+                Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
             };
             unsafe {
                 mem::swap(self.inner_mut(), new_port.inner_mut());
@@ -952,7 +944,7 @@ impl<T> Receiver<T> {
         loop {
             let port_or_empty = match *unsafe { self.inner() } {
                 Flavor::Oneshot(ref p) => {
-                    match unsafe { (*p.get()).recv(Some(deadline)) } {
+                    match p.recv(Some(deadline)) {
                         Ok(t) => return Ok(t),
                         Err(oneshot::Disconnected) => return Err(Disconnected),
                         Err(oneshot::Upgraded(rx)) => Some(rx),
@@ -960,7 +952,7 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Stream(ref p) => {
-                    match unsafe { (*p.get()).recv(Some(deadline)) } {
+                    match p.recv(Some(deadline)) {
                         Ok(t) => return Ok(t),
                         Err(stream::Disconnected) => return Err(Disconnected),
                         Err(stream::Upgraded(rx)) => Some(rx),
@@ -968,14 +960,14 @@ impl<T> Receiver<T> {
                     }
                 }
                 Flavor::Shared(ref p) => {
-                    match unsafe { (*p.get()).recv(Some(deadline)) } {
+                    match p.recv(Some(deadline)) {
                         Ok(t) => return Ok(t),
                         Err(shared::Disconnected) => return Err(Disconnected),
                         Err(shared::Empty) => None,
                     }
                 }
                 Flavor::Sync(ref p) => {
-                    match unsafe { (*p.get()).recv(Some(deadline)) } {
+                    match p.recv(Some(deadline)) {
                         Ok(t) => return Ok(t),
                         Err(sync::Disconnected) => return Err(Disconnected),
                         Err(sync::Empty) => None,
@@ -1020,23 +1012,19 @@ impl<T> select::Packet for Receiver<T> {
         loop {
             let new_port = match *unsafe { self.inner() } {
                 Flavor::Oneshot(ref p) => {
-                    match unsafe { (*p.get()).can_recv() } {
+                    match p.can_recv() {
                         Ok(ret) => return ret,
                         Err(upgrade) => upgrade,
                     }
                 }
                 Flavor::Stream(ref p) => {
-                    match unsafe { (*p.get()).can_recv() } {
+                    match p.can_recv() {
                         Ok(ret) => return ret,
                         Err(upgrade) => upgrade,
                     }
                 }
-                Flavor::Shared(ref p) => {
-                    return unsafe { (*p.get()).can_recv() };
-                }
-                Flavor::Sync(ref p) => {
-                    return unsafe { (*p.get()).can_recv() };
-                }
+                Flavor::Shared(ref p) => return p.can_recv(),
+                Flavor::Sync(ref p) => return p.can_recv(),
             };
             unsafe {
                 mem::swap(self.inner_mut(),
@@ -1049,25 +1037,21 @@ impl<T> select::Packet for Receiver<T> {
         loop {
             let (t, new_port) = match *unsafe { self.inner() } {
                 Flavor::Oneshot(ref p) => {
-                    match unsafe { (*p.get()).start_selection(token) } {
+                    match p.start_selection(token) {
                         oneshot::SelSuccess => return Installed,
                         oneshot::SelCanceled => return Abort,
                         oneshot::SelUpgraded(t, rx) => (t, rx),
                     }
                 }
                 Flavor::Stream(ref p) => {
-                    match unsafe { (*p.get()).start_selection(token) } {
+                    match p.start_selection(token) {
                         stream::SelSuccess => return Installed,
                         stream::SelCanceled => return Abort,
                         stream::SelUpgraded(t, rx) => (t, rx),
                     }
                 }
-                Flavor::Shared(ref p) => {
-                    return unsafe { (*p.get()).start_selection(token) };
-                }
-                Flavor::Sync(ref p) => {
-                    return unsafe { (*p.get()).start_selection(token) };
-                }
+                Flavor::Shared(ref p) => return p.start_selection(token),
+                Flavor::Sync(ref p) => return p.start_selection(token),
             };
             token = t;
             unsafe {
@@ -1080,16 +1064,10 @@ impl<T> select::Packet for Receiver<T> {
         let mut was_upgrade = false;
         loop {
             let result = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
-                Flavor::Stream(ref p) => unsafe {
-                    (*p.get()).abort_selection(was_upgrade)
-                },
-                Flavor::Shared(ref p) => return unsafe {
-                    (*p.get()).abort_selection(was_upgrade)
-                },
-                Flavor::Sync(ref p) => return unsafe {
-                    (*p.get()).abort_selection()
-                },
+                Flavor::Oneshot(ref p) => p.abort_selection(),
+                Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
+                Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
+                Flavor::Sync(ref p) => return p.abort_selection(),
             };
             let new_port = match result { Ok(b) => return b, Err(p) => p };
             was_upgrade = true;
@@ -1142,11 +1120,11 @@ impl <T> IntoIterator for Receiver<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for Receiver<T> {
     fn drop(&mut self) {
-        match *unsafe { self.inner_mut() } {
-            Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
-            Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
-            Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
-            Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
+        match *unsafe { self.inner() } {
+            Flavor::Oneshot(ref p) => p.drop_port(),
+            Flavor::Stream(ref p) => p.drop_port(),
+            Flavor::Shared(ref p) => p.drop_port(),
+            Flavor::Sync(ref p) => p.drop_port(),
         }
     }
 }
diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs
index 767e9f96ac8..b8e50c9297b 100644
--- a/src/libstd/sync/mpsc/oneshot.rs
+++ b/src/libstd/sync/mpsc/oneshot.rs
@@ -39,7 +39,8 @@ use self::MyUpgrade::*;
 
 use sync::mpsc::Receiver;
 use sync::mpsc::blocking::{self, SignalToken};
-use core::mem;
+use cell::UnsafeCell;
+use ptr;
 use sync::atomic::{AtomicUsize, Ordering};
 use time::Instant;
 
@@ -57,10 +58,10 @@ pub struct Packet<T> {
     // Internal state of the chan/port pair (stores the blocked thread as well)
     state: AtomicUsize,
     // One-shot data slot location
-    data: Option<T>,
+    data: UnsafeCell<Option<T>>,
     // when used for the second time, a oneshot channel must be upgraded, and
     // this contains the slot for the upgrade
-    upgrade: MyUpgrade<T>,
+    upgrade: UnsafeCell<MyUpgrade<T>>,
 }
 
 pub enum Failure<T> {
@@ -90,42 +91,44 @@ enum MyUpgrade<T> {
 impl<T> Packet<T> {
     pub fn new() -> Packet<T> {
         Packet {
-            data: None,
-            upgrade: NothingSent,
+            data: UnsafeCell::new(None),
+            upgrade: UnsafeCell::new(NothingSent),
             state: AtomicUsize::new(EMPTY),
         }
     }
 
-    pub fn send(&mut self, t: T) -> Result<(), T> {
-        // Sanity check
-        match self.upgrade {
-            NothingSent => {}
-            _ => panic!("sending on a oneshot that's already sent on "),
-        }
-        assert!(self.data.is_none());
-        self.data = Some(t);
-        self.upgrade = SendUsed;
-
-        match self.state.swap(DATA, Ordering::SeqCst) {
-            // Sent the data, no one was waiting
-            EMPTY => Ok(()),
-
-            // Couldn't send the data, the port hung up first. Return the data
-            // back up the stack.
-            DISCONNECTED => {
-                self.state.swap(DISCONNECTED, Ordering::SeqCst);
-                self.upgrade = NothingSent;
-                Err(self.data.take().unwrap())
+    pub fn send(&self, t: T) -> Result<(), T> {
+        unsafe {
+            // Sanity check
+            match *self.upgrade.get() {
+                NothingSent => {}
+                _ => panic!("sending on a oneshot that's already sent on "),
             }
+            assert!((*self.data.get()).is_none());
+            ptr::write(self.data.get(), Some(t));
+            ptr::write(self.upgrade.get(), SendUsed);
+
+            match self.state.swap(DATA, Ordering::SeqCst) {
+                // Sent the data, no one was waiting
+                EMPTY => Ok(()),
+
+                // Couldn't send the data, the port hung up first. Return the data
+                // back up the stack.
+                DISCONNECTED => {
+                    self.state.swap(DISCONNECTED, Ordering::SeqCst);
+                    ptr::write(self.upgrade.get(), NothingSent);
+                    Err((&mut *self.data.get()).take().unwrap())
+                }
 
-            // Not possible, these are one-use channels
-            DATA => unreachable!(),
+                // Not possible, these are one-use channels
+                DATA => unreachable!(),
 
-            // There is a thread waiting on the other end. We leave the 'DATA'
-            // state inside so it'll pick it up on the other end.
-            ptr => unsafe {
-                SignalToken::cast_from_usize(ptr).signal();
-                Ok(())
+                // There is a thread waiting on the other end. We leave the 'DATA'
+                // state inside so it'll pick it up on the other end.
+                ptr => {
+                    SignalToken::cast_from_usize(ptr).signal();
+                    Ok(())
+                }
             }
         }
     }
@@ -133,13 +136,15 @@ impl<T> Packet<T> {
     // Just tests whether this channel has been sent on or not, this is only
     // safe to use from the sender.
     pub fn sent(&self) -> bool {
-        match self.upgrade {
-            NothingSent => false,
-            _ => true,
+        unsafe {
+            match *self.upgrade.get() {
+                NothingSent => false,
+                _ => true,
+            }
         }
     }
 
-    pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
+    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
         // Attempt to not block the thread (it's a little expensive). If it looks
         // like we're not empty, then immediately go through to `try_recv`.
         if self.state.load(Ordering::SeqCst) == EMPTY {
@@ -167,73 +172,77 @@ impl<T> Packet<T> {
         self.try_recv()
     }
 
-    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
-        match self.state.load(Ordering::SeqCst) {
-            EMPTY => Err(Empty),
-
-            // We saw some data on the channel, but the channel can be used
-            // again to send us an upgrade. As a result, we need to re-insert
-            // into the channel that there's no data available (otherwise we'll
-            // just see DATA next time). This is done as a cmpxchg because if
-            // the state changes under our feet we'd rather just see that state
-            // change.
-            DATA => {
-                self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
-                match self.data.take() {
-                    Some(data) => Ok(data),
-                    None => unreachable!(),
+    pub fn try_recv(&self) -> Result<T, Failure<T>> {
+        unsafe {
+            match self.state.load(Ordering::SeqCst) {
+                EMPTY => Err(Empty),
+
+                // We saw some data on the channel, but the channel can be used
+                // again to send us an upgrade. As a result, we need to re-insert
+                // into the channel that there's no data available (otherwise we'll
+                // just see DATA next time). This is done as a cmpxchg because if
+                // the state changes under our feet we'd rather just see that state
+                // change.
+                DATA => {
+                    self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
+                    match (&mut *self.data.get()).take() {
+                        Some(data) => Ok(data),
+                        None => unreachable!(),
+                    }
                 }
-            }
 
-            // There's no guarantee that we receive before an upgrade happens,
-            // and an upgrade flags the channel as disconnected, so when we see
-            // this we first need to check if there's data available and *then*
-            // we go through and process the upgrade.
-            DISCONNECTED => {
-                match self.data.take() {
-                    Some(data) => Ok(data),
-                    None => {
-                        match mem::replace(&mut self.upgrade, SendUsed) {
-                            SendUsed | NothingSent => Err(Disconnected),
-                            GoUp(upgrade) => Err(Upgraded(upgrade))
+                // There's no guarantee that we receive before an upgrade happens,
+                // and an upgrade flags the channel as disconnected, so when we see
+                // this we first need to check if there's data available and *then*
+                // we go through and process the upgrade.
+                DISCONNECTED => {
+                    match (&mut *self.data.get()).take() {
+                        Some(data) => Ok(data),
+                        None => {
+                            match ptr::replace(self.upgrade.get(), SendUsed) {
+                                SendUsed | NothingSent => Err(Disconnected),
+                                GoUp(upgrade) => Err(Upgraded(upgrade))
+                            }
                         }
                     }
                 }
-            }
 
-            // We are the sole receiver; there cannot be a blocking
-            // receiver already.
-            _ => unreachable!()
+                // We are the sole receiver; there cannot be a blocking
+                // receiver already.
+                _ => unreachable!()
+            }
         }
     }
 
     // Returns whether the upgrade was completed. If the upgrade wasn't
     // completed, then the port couldn't get sent to the other half (it will
     // never receive it).
-    pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
-        let prev = match self.upgrade {
-            NothingSent => NothingSent,
-            SendUsed => SendUsed,
-            _ => panic!("upgrading again"),
-        };
-        self.upgrade = GoUp(up);
-
-        match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
-            // If the channel is empty or has data on it, then we're good to go.
-            // Senders will check the data before the upgrade (in case we
-            // plastered over the DATA state).
-            DATA | EMPTY => UpSuccess,
-
-            // If the other end is already disconnected, then we failed the
-            // upgrade. Be sure to trash the port we were given.
-            DISCONNECTED => { self.upgrade = prev; UpDisconnected }
-
-            // If someone's waiting, we gotta wake them up
-            ptr => UpWoke(unsafe { SignalToken::cast_from_usize(ptr) })
+    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
+        unsafe {
+            let prev = match *self.upgrade.get() {
+                NothingSent => NothingSent,
+                SendUsed => SendUsed,
+                _ => panic!("upgrading again"),
+            };
+            ptr::write(self.upgrade.get(), GoUp(up));
+
+            match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+                // If the channel is empty or has data on it, then we're good to go.
+                // Senders will check the data before the upgrade (in case we
+                // plastered over the DATA state).
+                DATA | EMPTY => UpSuccess,
+
+                // If the other end is already disconnected, then we failed the
+                // upgrade. Be sure to trash the port we were given.
+                DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
+
+                // If someone's waiting, we gotta wake them up
+                ptr => UpWoke(SignalToken::cast_from_usize(ptr))
+            }
         }
     }
 
-    pub fn drop_chan(&mut self) {
+    pub fn drop_chan(&self) {
         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
             DATA | DISCONNECTED | EMPTY => {}
 
@@ -244,7 +253,7 @@ impl<T> Packet<T> {
         }
     }
 
-    pub fn drop_port(&mut self) {
+    pub fn drop_port(&self) {
         match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
             // An empty channel has nothing to do, and a remotely disconnected
             // channel also has nothing to do b/c we're about to run the drop
@@ -254,7 +263,7 @@ impl<T> Packet<T> {
             // There's data on the channel, so make sure we destroy it promptly.
             // This is why not using an arc is a little difficult (need the box
             // to stay valid while we take the data).
-            DATA => { self.data.take().unwrap(); }
+            DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
 
             // We're the only ones that can block on this port
             _ => unreachable!()
@@ -267,62 +276,66 @@ impl<T> Packet<T> {
 
     // If Ok, the value is whether this port has data, if Err, then the upgraded
     // port needs to be checked instead of this one.
-    pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
-        match self.state.load(Ordering::SeqCst) {
-            EMPTY => Ok(false), // Welp, we tried
-            DATA => Ok(true),   // we have some un-acquired data
-            DISCONNECTED if self.data.is_some() => Ok(true), // we have data
-            DISCONNECTED => {
-                match mem::replace(&mut self.upgrade, SendUsed) {
-                    // The other end sent us an upgrade, so we need to
-                    // propagate upwards whether the upgrade can receive
-                    // data
-                    GoUp(upgrade) => Err(upgrade),
-
-                    // If the other end disconnected without sending an
-                    // upgrade, then we have data to receive (the channel is
-                    // disconnected).
-                    up => { self.upgrade = up; Ok(true) }
+    pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
+        unsafe {
+            match self.state.load(Ordering::SeqCst) {
+                EMPTY => Ok(false), // Welp, we tried
+                DATA => Ok(true),   // we have some un-acquired data
+                DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
+                DISCONNECTED => {
+                    match ptr::replace(self.upgrade.get(), SendUsed) {
+                        // The other end sent us an upgrade, so we need to
+                        // propagate upwards whether the upgrade can receive
+                        // data
+                        GoUp(upgrade) => Err(upgrade),
+
+                        // If the other end disconnected without sending an
+                        // upgrade, then we have data to receive (the channel is
+                        // disconnected).
+                        up => { ptr::write(self.upgrade.get(), up); Ok(true) }
+                    }
                 }
+                _ => unreachable!(), // we're the "one blocker"
             }
-            _ => unreachable!(), // we're the "one blocker"
         }
     }
 
     // Attempts to start selection on this port. This can either succeed, fail
     // because there is data, or fail because there is an upgrade pending.
-    pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
-        let ptr = unsafe { token.cast_to_usize() };
-        match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
-            EMPTY => SelSuccess,
-            DATA => {
-                drop(unsafe { SignalToken::cast_from_usize(ptr) });
-                SelCanceled
-            }
-            DISCONNECTED if self.data.is_some() => {
-                drop(unsafe { SignalToken::cast_from_usize(ptr) });
-                SelCanceled
-            }
-            DISCONNECTED => {
-                match mem::replace(&mut self.upgrade, SendUsed) {
-                    // The other end sent us an upgrade, so we need to
-                    // propagate upwards whether the upgrade can receive
-                    // data
-                    GoUp(upgrade) => {
-                        SelUpgraded(unsafe { SignalToken::cast_from_usize(ptr) }, upgrade)
-                    }
+    pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
+        unsafe {
+            let ptr = token.cast_to_usize();
+            match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
+                EMPTY => SelSuccess,
+                DATA => {
+                    drop(SignalToken::cast_from_usize(ptr));
+                    SelCanceled
+                }
+                DISCONNECTED if (*self.data.get()).is_some() => {
+                    drop(SignalToken::cast_from_usize(ptr));
+                    SelCanceled
+                }
+                DISCONNECTED => {
+                    match ptr::replace(self.upgrade.get(), SendUsed) {
+                        // The other end sent us an upgrade, so we need to
+                        // propagate upwards whether the upgrade can receive
+                        // data
+                        GoUp(upgrade) => {
+                            SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
+                        }
 
-                    // If the other end disconnected without sending an
-                    // upgrade, then we have data to receive (the channel is
-                    // disconnected).
-                    up => {
-                        self.upgrade = up;
-                        drop(unsafe { SignalToken::cast_from_usize(ptr) });
-                        SelCanceled
+                        // If the other end disconnected without sending an
+                        // upgrade, then we have data to receive (the channel is
+                        // disconnected).
+                        up => {
+                            ptr::write(self.upgrade.get(), up);
+                            drop(SignalToken::cast_from_usize(ptr));
+                            SelCanceled
+                        }
                     }
                 }
+                _ => unreachable!(), // we're the "one blocker"
             }
-            _ => unreachable!(), // we're the "one blocker"
         }
     }
 
@@ -330,7 +343,7 @@ impl<T> Packet<T> {
     // blocked thread will no longer be visible to any other threads.
     //
     // The return value indicates whether there's data on this port.
-    pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> {
+    pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
         let state = match self.state.load(Ordering::SeqCst) {
             // Each of these states means that no further activity will happen
             // with regard to abortion selection
@@ -356,16 +369,16 @@ impl<T> Packet<T> {
             //
             // We then need to check to see if there was an upgrade requested,
             // and if so, the upgraded port needs to have its selection aborted.
-            DISCONNECTED => {
-                if self.data.is_some() {
+            DISCONNECTED => unsafe {
+                if (*self.data.get()).is_some() {
                     Ok(true)
                 } else {
-                    match mem::replace(&mut self.upgrade, SendUsed) {
+                    match ptr::replace(self.upgrade.get(), SendUsed) {
                         GoUp(port) => Err(port),
                         _ => Ok(true),
                     }
                 }
-            }
+            },
 
             // We woke ourselves up from select.
             ptr => unsafe {
diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs
index 2a9618251ff..f9e02904164 100644
--- a/src/libstd/sync/mpsc/shared.rs
+++ b/src/libstd/sync/mpsc/shared.rs
@@ -24,6 +24,8 @@ use core::cmp;
 use core::intrinsics::abort;
 use core::isize;
 
+use cell::UnsafeCell;
+use ptr;
 use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
 use sync::mpsc::blocking::{self, SignalToken};
 use sync::mpsc::mpsc_queue as mpsc;
@@ -44,7 +46,7 @@ const MAX_STEALS: isize = 1 << 20;
 pub struct Packet<T> {
     queue: mpsc::Queue<T>,
     cnt: AtomicIsize, // How many items are on this channel
-    steals: isize, // How many times has a port received without blocking?
+    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
     to_wake: AtomicUsize, // SignalToken for wake up
 
     // The number of channels which are currently using this packet.
@@ -72,7 +74,7 @@ impl<T> Packet<T> {
         Packet {
             queue: mpsc::Queue::new(),
             cnt: AtomicIsize::new(0),
-            steals: 0,
+            steals: UnsafeCell::new(0),
             to_wake: AtomicUsize::new(0),
             channels: AtomicUsize::new(2),
             port_dropped: AtomicBool::new(false),
@@ -95,7 +97,7 @@ impl<T> Packet<T> {
     // threads in select().
     //
     // This can only be called at channel-creation time
-    pub fn inherit_blocker(&mut self,
+    pub fn inherit_blocker(&self,
                            token: Option<SignalToken>,
                            guard: MutexGuard<()>) {
         token.map(|token| {
@@ -122,7 +124,7 @@ impl<T> Packet<T> {
             // To offset this bad increment, we initially set the steal count to
             // -1. You'll find some special code in abort_selection() as well to
             // ensure that this -1 steal count doesn't escape too far.
-            self.steals = -1;
+            unsafe { *self.steals.get() = -1; }
         });
 
         // When the shared packet is constructed, we grabbed this lock. The
@@ -133,7 +135,7 @@ impl<T> Packet<T> {
         drop(guard);
     }
 
-    pub fn send(&mut self, t: T) -> Result<(), T> {
+    pub fn send(&self, t: T) -> Result<(), T> {
         // See Port::drop for what's going on
         if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
 
@@ -218,7 +220,7 @@ impl<T> Packet<T> {
         Ok(())
     }
 
-    pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure> {
+    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
         // This code is essentially the exact same as that found in the stream
         // case (see stream.rs)
         match self.try_recv() {
@@ -239,37 +241,38 @@ impl<T> Packet<T> {
         }
 
         match self.try_recv() {
-            data @ Ok(..) => { self.steals -= 1; data }
+            data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },
             data => data,
         }
     }
 
     // Essentially the exact same thing as the stream decrement function.
     // Returns true if blocking should proceed.
-    fn decrement(&mut self, token: SignalToken) -> StartResult {
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
-        let ptr = unsafe { token.cast_to_usize() };
-        self.to_wake.store(ptr, Ordering::SeqCst);
-
-        let steals = self.steals;
-        self.steals = 0;
-
-        match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
-            // If we factor in our steals and notice that the channel has no
-            // data, we successfully sleep
-            n => {
-                assert!(n >= 0);
-                if n - steals <= 0 { return Installed }
+    fn decrement(&self, token: SignalToken) -> StartResult {
+        unsafe {
+            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            let ptr = token.cast_to_usize();
+            self.to_wake.store(ptr, Ordering::SeqCst);
+
+            let steals = ptr::replace(self.steals.get(), 0);
+
+            match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+                DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+                // If we factor in our steals and notice that the channel has no
+                // data, we successfully sleep
+                n => {
+                    assert!(n >= 0);
+                    if n - steals <= 0 { return Installed }
+                }
             }
-        }
 
-        self.to_wake.store(0, Ordering::SeqCst);
-        drop(unsafe { SignalToken::cast_from_usize(ptr) });
-        Abort
+            self.to_wake.store(0, Ordering::SeqCst);
+            drop(SignalToken::cast_from_usize(ptr));
+            Abort
+        }
     }
 
-    pub fn try_recv(&mut self) -> Result<T, Failure> {
+    pub fn try_recv(&self) -> Result<T, Failure> {
         let ret = match self.queue.pop() {
             mpsc::Data(t) => Some(t),
             mpsc::Empty => None,
@@ -303,23 +306,23 @@ impl<T> Packet<T> {
         match ret {
             // See the discussion in the stream implementation for why we
             // might decrement steals.
-            Some(data) => {
-                if self.steals > MAX_STEALS {
+            Some(data) => unsafe {
+                if *self.steals.get() > MAX_STEALS {
                     match self.cnt.swap(0, Ordering::SeqCst) {
                         DISCONNECTED => {
                             self.cnt.store(DISCONNECTED, Ordering::SeqCst);
                         }
                         n => {
-                            let m = cmp::min(n, self.steals);
-                            self.steals -= m;
+                            let m = cmp::min(n, *self.steals.get());
+                            *self.steals.get() -= m;
                             self.bump(n - m);
                         }
                     }
-                    assert!(self.steals >= 0);
+                    assert!(*self.steals.get() >= 0);
                 }
-                self.steals += 1;
+                *self.steals.get() += 1;
                 Ok(data)
-            }
+            },
 
             // See the discussion in the stream implementation for why we try
             // again.
@@ -341,7 +344,7 @@ impl<T> Packet<T> {
 
     // Prepares this shared packet for a channel clone, essentially just bumping
     // a refcount.
-    pub fn clone_chan(&mut self) {
+    pub fn clone_chan(&self) {
         let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
 
         // See comments on Arc::clone() on why we do this (for `mem::forget`).
@@ -355,7 +358,7 @@ impl<T> Packet<T> {
     // Decrement the reference count on a channel. This is called whenever a
     // Chan is dropped and may end up waking up a receiver. It's the receiver's
     // responsibility on the other end to figure out that we've disconnected.
-    pub fn drop_chan(&mut self) {
+    pub fn drop_chan(&self) {
         match self.channels.fetch_sub(1, Ordering::SeqCst) {
             1 => {}
             n if n > 1 => return,
@@ -371,9 +374,9 @@ impl<T> Packet<T> {
 
     // See the long discussion inside of stream.rs for why the queue is drained,
     // and why it is done in this fashion.
-    pub fn drop_port(&mut self) {
+    pub fn drop_port(&self) {
         self.port_dropped.store(true, Ordering::SeqCst);
-        let mut steals = self.steals;
+        let mut steals = unsafe { *self.steals.get() };
         while {
             let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
             cnt != DISCONNECTED && cnt != steals
@@ -390,7 +393,7 @@ impl<T> Packet<T> {
     }
 
     // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> SignalToken {
+    fn take_to_wake(&self) -> SignalToken {
         let ptr = self.to_wake.load(Ordering::SeqCst);
         self.to_wake.store(0, Ordering::SeqCst);
         assert!(ptr != 0);
@@ -406,13 +409,13 @@ impl<T> Packet<T> {
     //
     // This is different than the stream version because there's no need to peek
     // at the queue, we can just look at the local count.
-    pub fn can_recv(&mut self) -> bool {
+    pub fn can_recv(&self) -> bool {
         let cnt = self.cnt.load(Ordering::SeqCst);
-        cnt == DISCONNECTED || cnt - self.steals > 0
+        cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
     }
 
     // increment the count on the channel (used for selection)
-    fn bump(&mut self, amt: isize) -> isize {
+    fn bump(&self, amt: isize) -> isize {
         match self.cnt.fetch_add(amt, Ordering::SeqCst) {
             DISCONNECTED => {
                 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@@ -427,7 +430,7 @@ impl<T> Packet<T> {
     //
     // The code here is the same as in stream.rs, except that it doesn't need to
     // peek at the channel to see if an upgrade is pending.
-    pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
+    pub fn start_selection(&self, token: SignalToken) -> StartResult {
         match self.decrement(token) {
             Installed => Installed,
             Abort => {
@@ -443,7 +446,7 @@ impl<T> Packet<T> {
     //
     // This is similar to the stream implementation (hence fewer comments), but
     // uses a different value for the "steals" variable.
-    pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
+    pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
         // Before we do anything else, we bounce on this lock. The reason for
         // doing this is to ensure that any upgrade-in-progress is gone and
         // done with. Without this bounce, we can race with inherit_blocker
@@ -477,12 +480,15 @@ impl<T> Packet<T> {
                     thread::yield_now();
                 }
             }
-            // if the number of steals is -1, it was the pre-emptive -1 steal
-            // count from when we inherited a blocker. This is fine because
-            // we're just going to overwrite it with a real value.
-            assert!(self.steals == 0 || self.steals == -1);
-            self.steals = steals;
-            prev >= 0
+            unsafe {
+                // if the number of steals is -1, it was the pre-emptive -1 steal
+                // count from when we inherited a blocker. This is fine because
+                // we're just going to overwrite it with a real value.
+                let old = self.steals.get();
+                assert!(*old == 0 || *old == -1);
+                *old = steals;
+                prev >= 0
+            }
         }
     }
 }
diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs
index 61c8316467d..47cd8977fda 100644
--- a/src/libstd/sync/mpsc/stream.rs
+++ b/src/libstd/sync/mpsc/stream.rs
@@ -22,8 +22,10 @@ pub use self::UpgradeResult::*;
 pub use self::SelectionResult::*;
 use self::Message::*;
 
+use cell::UnsafeCell;
 use core::cmp;
 use core::isize;
+use ptr;
 use thread;
 use time::Instant;
 
@@ -42,7 +44,7 @@ pub struct Packet<T> {
     queue: spsc::Queue<Message<T>>, // internal queue for all message
 
     cnt: AtomicIsize, // How many items are on this channel
-    steals: isize, // How many times has a port received without blocking?
+    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
     to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
 
     port_dropped: AtomicBool, // flag if the channel has been destroyed.
@@ -79,14 +81,14 @@ impl<T> Packet<T> {
             queue: unsafe { spsc::Queue::new(128) },
 
             cnt: AtomicIsize::new(0),
-            steals: 0,
+            steals: UnsafeCell::new(0),
             to_wake: AtomicUsize::new(0),
 
             port_dropped: AtomicBool::new(false),
         }
     }
 
-    pub fn send(&mut self, t: T) -> Result<(), T> {
+    pub fn send(&self, t: T) -> Result<(), T> {
         // If the other port has deterministically gone away, then definitely
         // must return the data back up the stack. Otherwise, the data is
         // considered as being sent.
@@ -99,7 +101,7 @@ impl<T> Packet<T> {
         Ok(())
     }
 
-    pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
+    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
         // If the port has gone away, then there's no need to proceed any
         // further.
         if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
@@ -107,7 +109,7 @@ impl<T> Packet<T> {
         self.do_send(GoUp(up))
     }
 
-    fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
+    fn do_send(&self, t: Message<T>) -> UpgradeResult {
         self.queue.push(t);
         match self.cnt.fetch_add(1, Ordering::SeqCst) {
             // As described in the mod's doc comment, -1 == wakeup
@@ -141,7 +143,7 @@ impl<T> Packet<T> {
     }
 
     // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> SignalToken {
+    fn take_to_wake(&self) -> SignalToken {
         let ptr = self.to_wake.load(Ordering::SeqCst);
         self.to_wake.store(0, Ordering::SeqCst);
         assert!(ptr != 0);
@@ -151,13 +153,12 @@ impl<T> Packet<T> {
     // Decrements the count on the channel for a sleeper, returning the sleeper
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
-    fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> {
+    fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
         assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
         let ptr = unsafe { token.cast_to_usize() };
         self.to_wake.store(ptr, Ordering::SeqCst);
 
-        let steals = self.steals;
-        self.steals = 0;
+        let steals = unsafe { ptr::replace(self.steals.get(), 0) };
 
         match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
             DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
@@ -173,7 +174,7 @@ impl<T> Packet<T> {
         Err(unsafe { SignalToken::cast_from_usize(ptr) })
     }
 
-    pub fn recv(&mut self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
+    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
         // Optimistic preflight check (scheduling is expensive).
         match self.try_recv() {
             Err(Empty) => {}
@@ -199,16 +200,16 @@ impl<T> Packet<T> {
             // a steal, so offset the decrement here (we already have our
             // "steal" factored into the channel count above).
             data @ Ok(..) |
-            data @ Err(Upgraded(..)) => {
-                self.steals -= 1;
+            data @ Err(Upgraded(..)) => unsafe {
+                *self.steals.get() -= 1;
                 data
-            }
+            },
 
             data => data,
         }
     }
 
-    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
+    pub fn try_recv(&self) -> Result<T, Failure<T>> {
         match self.queue.pop() {
             // If we stole some data, record to that effect (this will be
             // factored into cnt later on).
@@ -221,26 +222,26 @@ impl<T> Packet<T> {
             // a pretty slow operation, of swapping 0 into cnt, taking steals
             // down as much as possible (without going negative), and then
             // adding back in whatever we couldn't factor into steals.
-            Some(data) => {
-                if self.steals > MAX_STEALS {
+            Some(data) => unsafe {
+                if *self.steals.get() > MAX_STEALS {
                     match self.cnt.swap(0, Ordering::SeqCst) {
                         DISCONNECTED => {
                             self.cnt.store(DISCONNECTED, Ordering::SeqCst);
                         }
                         n => {
-                            let m = cmp::min(n, self.steals);
-                            self.steals -= m;
+                            let m = cmp::min(n, *self.steals.get());
+                            *self.steals.get() -= m;
                             self.bump(n - m);
                         }
                     }
-                    assert!(self.steals >= 0);
+                    assert!(*self.steals.get() >= 0);
                 }
-                self.steals += 1;
+                *self.steals.get() += 1;
                 match data {
                     Data(t) => Ok(t),
                     GoUp(up) => Err(Upgraded(up)),
                 }
-            }
+            },
 
             None => {
                 match self.cnt.load(Ordering::SeqCst) {
@@ -269,7 +270,7 @@ impl<T> Packet<T> {
         }
     }
 
-    pub fn drop_chan(&mut self) {
+    pub fn drop_chan(&self) {
         // Dropping a channel is pretty simple, we just flag it as disconnected
         // and then wakeup a blocker if there is one.
         match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
@@ -279,7 +280,7 @@ impl<T> Packet<T> {
         }
     }
 
-    pub fn drop_port(&mut self) {
+    pub fn drop_port(&self) {
         // Dropping a port seems like a fairly trivial thing. In theory all we
         // need to do is flag that we're disconnected and then everything else
         // can take over (we don't have anyone to wake up).
@@ -309,7 +310,7 @@ impl<T> Packet<T> {
         // continue to fail while active senders send data while we're dropping
         // data, but eventually we're guaranteed to break out of this loop
         // (because there is a bounded number of senders).
-        let mut steals = self.steals;
+        let mut steals = unsafe { *self.steals.get() };
         while {
             let cnt = self.cnt.compare_and_swap(
                             steals, DISCONNECTED, Ordering::SeqCst);
@@ -332,7 +333,7 @@ impl<T> Packet<T> {
     // Tests to see whether this port can receive without blocking. If Ok is
     // returned, then that's the answer. If Err is returned, then the returned
     // port needs to be queried instead (an upgrade happened)
-    pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
+    pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
         // We peek at the queue to see if there's anything on it, and we use
         // this return value to determine if we should pop from the queue and
         // upgrade this channel immediately. If it looks like we've got an
@@ -351,7 +352,7 @@ impl<T> Packet<T> {
     }
 
     // increment the count on the channel (used for selection)
-    fn bump(&mut self, amt: isize) -> isize {
+    fn bump(&self, amt: isize) -> isize {
         match self.cnt.fetch_add(amt, Ordering::SeqCst) {
             DISCONNECTED => {
                 self.cnt.store(DISCONNECTED, Ordering::SeqCst);
@@ -363,7 +364,7 @@ impl<T> Packet<T> {
 
     // Attempts to start selecting on this port. Like a oneshot, this can fail
     // immediately because of an upgrade.
-    pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> {
+    pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
         match self.decrement(token) {
             Ok(()) => SelSuccess,
             Err(token) => {
@@ -387,7 +388,7 @@ impl<T> Packet<T> {
     }
 
     // Removes a previous thread from being blocked in this port
-    pub fn abort_selection(&mut self,
+    pub fn abort_selection(&self,
                            was_upgrade: bool) -> Result<bool, Receiver<T>> {
         // If we're aborting selection after upgrading from a oneshot, then
         // we're guarantee that no one is waiting. The only way that we could
@@ -403,7 +404,7 @@ impl<T> Packet<T> {
         // this end. This is fine because we know it's a small bounded windows
         // of time until the data is actually sent.
         if was_upgrade {
-            assert_eq!(self.steals, 0);
+            assert_eq!(unsafe { *self.steals.get() }, 0);
             assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
             return Ok(true)
         }
@@ -444,8 +445,10 @@ impl<T> Packet<T> {
                     thread::yield_now();
                 }
             }
-            assert_eq!(self.steals, 0);
-            self.steals = steals;
+            unsafe {
+                assert_eq!(*self.steals.get(), 0);
+                *self.steals.get() = steals;
+            }
 
             // if we were previously positive, then there's surely data to
             // receive