diff options
| author | Andrew Paseltiner <apaseltiner@gmail.com> | 2016-12-16 18:10:09 -0500 |
|---|---|---|
| committer | Andrew Paseltiner <apaseltiner@gmail.com> | 2016-12-16 19:52:12 -0500 |
| commit | 26d4308c6acbbde642be4621f5e3487b24ed8bd1 (patch) | |
| tree | 78fd560603a6ffdc3360d5c75c5e148f44dedaac /src/libstd/sync/mpsc/mod.rs | |
| parent | d250169cb5a96481a3e7c8f9fe05de49f60e5ae5 (diff) | |
| download | rust-26d4308c6acbbde642be4621f5e3487b24ed8bd1.tar.gz rust-26d4308c6acbbde642be4621f5e3487b24ed8bd1.zip | |
Replace invalid use of `&mut` with `UnsafeCell` in `std::sync::mpsc`
Closes #36934
Diffstat (limited to 'src/libstd/sync/mpsc/mod.rs')
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 192 |
1 files changed, 85 insertions, 107 deletions
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 9f51d3e87f3..0c3f31455cc 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -348,7 +348,7 @@ impl<T> !Sync for Sender<T> { } /// owned by one thread, but it can be cloned to send to other threads. #[stable(feature = "rust1", since = "1.0.0")] pub struct SyncSender<T> { - inner: Arc<UnsafeCell<sync::Packet<T>>>, + inner: Arc<sync::Packet<T>>, } #[stable(feature = "rust1", since = "1.0.0")] @@ -426,10 +426,10 @@ pub enum TrySendError<T> { } enum Flavor<T> { - Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>), - Stream(Arc<UnsafeCell<stream::Packet<T>>>), - Shared(Arc<UnsafeCell<shared::Packet<T>>>), - Sync(Arc<UnsafeCell<sync::Packet<T>>>), + Oneshot(Arc<oneshot::Packet<T>>), + Stream(Arc<stream::Packet<T>>), + Shared(Arc<shared::Packet<T>>), + Sync(Arc<sync::Packet<T>>), } #[doc(hidden)] @@ -487,7 +487,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn channel<T>() -> (Sender<T>, Receiver<T>) { - let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); + let a = Arc::new(oneshot::Packet::new()); (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) } @@ -532,7 +532,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { - let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); + let a = Arc::new(sync::Packet::new(bound)); (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) } @@ -578,38 +578,30 @@ impl<T> Sender<T> { pub fn send(&self, t: T) -> Result<(), SendError<T>> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - unsafe { - let p = p.get(); - if !(*p).sent() { - return (*p).send(t).map_err(SendError); - } else { - let a = - Arc::new(UnsafeCell::new(stream::Packet::new())); - let rx = Receiver::new(Flavor::Stream(a.clone())); - match (*p).upgrade(rx) { - oneshot::UpSuccess => { - let ret = (*a.get()).send(t); - (a, ret) - } - oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(token) => { - // This send cannot panic because the thread is - // asleep (we're looking at it), so the receiver - // can't go away. - (*a.get()).send(t).ok().unwrap(); - token.signal(); - (a, Ok(())) - } + if !p.sent() { + return p.send(t).map_err(SendError); + } else { + let a = Arc::new(stream::Packet::new()); + let rx = Receiver::new(Flavor::Stream(a.clone())); + match p.upgrade(rx) { + oneshot::UpSuccess => { + let ret = a.send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(token) => { + // This send cannot panic because the thread is + // asleep (we're looking at it), so the receiver + // can't go away. + a.send(t).ok().unwrap(); + token.signal(); + (a, Ok(())) } } } } - Flavor::Stream(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, + Flavor::Stream(ref p) => return p.send(t).map_err(SendError), + Flavor::Shared(ref p) => return p.send(t).map_err(SendError), Flavor::Sync(..) => unreachable!(), }; @@ -624,41 +616,43 @@ impl<T> Sender<T> { #[stable(feature = "rust1", since = "1.0.0")] impl<T> Clone for Sender<T> { fn clone(&self) -> Sender<T> { - let (packet, sleeper, guard) = match *unsafe { self.inner() } { + let packet = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { oneshot::UpSuccess | - oneshot::UpDisconnected => (a, None, guard), - oneshot::UpWoke(task) => (a, Some(task), guard) - } + oneshot::UpDisconnected => None, + oneshot::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Stream(ref p) => { - let a = Arc::new(UnsafeCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); + let a = Arc::new(shared::Packet::new()); + { + let guard = a.postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + let sleeper = match p.upgrade(rx) { stream::UpSuccess | - stream::UpDisconnected => (a, None, guard), - stream::UpWoke(task) => (a, Some(task), guard), - } + stream::UpDisconnected => None, + stream::UpWoke(task) => Some(task), + }; + a.inherit_blocker(sleeper, guard); } + a } Flavor::Shared(ref p) => { - unsafe { (*p.get()).clone_chan(); } + p.clone_chan(); return Sender::new(Flavor::Shared(p.clone())); } Flavor::Sync(..) => unreachable!(), }; unsafe { - (*packet.get()).inherit_blocker(sleeper, guard); - let tmp = Sender::new(Flavor::Shared(packet.clone())); mem::swap(self.inner_mut(), tmp.inner_mut()); } @@ -669,10 +663,10 @@ impl<T> Clone for Sender<T> { #[stable(feature = "rust1", since = "1.0.0")] impl<T> Drop for Sender<T> { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_chan(), + Flavor::Stream(ref p) => p.drop_chan(), + Flavor::Shared(ref p) => p.drop_chan(), Flavor::Sync(..) => unreachable!(), } } @@ -690,7 +684,7 @@ impl<T> fmt::Debug for Sender<T> { //////////////////////////////////////////////////////////////////////////////// impl<T> SyncSender<T> { - fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> { + fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> { SyncSender { inner: inner } } @@ -710,7 +704,7 @@ impl<T> SyncSender<T> { /// information. #[stable(feature = "rust1", since = "1.0.0")] pub fn send(&self, t: T) -> Result<(), SendError<T>> { - unsafe { (*self.inner.get()).send(t).map_err(SendError) } + self.inner.send(t).map_err(SendError) } /// Attempts to send a value on this channel without blocking. @@ -724,14 +718,14 @@ impl<T> SyncSender<T> { /// receiver has received the data or not if this function is successful. #[stable(feature = "rust1", since = "1.0.0")] pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { - unsafe { (*self.inner.get()).try_send(t) } + self.inner.try_send(t) } } #[stable(feature = "rust1", since = "1.0.0")] impl<T> Clone for SyncSender<T> { fn clone(&self) -> SyncSender<T> { - unsafe { (*self.inner.get()).clone_chan(); } + self.inner.clone_chan(); SyncSender::new(self.inner.clone()) } } @@ -739,7 +733,7 @@ impl<T> Clone for SyncSender<T> { #[stable(feature = "rust1", since = "1.0.0")] impl<T> Drop for SyncSender<T> { fn drop(&mut self) { - unsafe { (*self.inner.get()).drop_chan(); } + self.inner.drop_chan(); } } @@ -772,7 +766,7 @@ impl<T> Receiver<T> { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(oneshot::Empty) => return Err(TryRecvError::Empty), Err(oneshot::Disconnected) => { @@ -782,7 +776,7 @@ impl<T> Receiver<T> { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(stream::Empty) => return Err(TryRecvError::Empty), Err(stream::Disconnected) => { @@ -792,7 +786,7 @@ impl<T> Receiver<T> { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(shared::Empty) => return Err(TryRecvError::Empty), Err(shared::Disconnected) => { @@ -801,7 +795,7 @@ impl<T> Receiver<T> { } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).try_recv() } { + match p.try_recv() { Ok(t) => return Ok(t), Err(sync::Empty) => return Err(TryRecvError::Empty), Err(sync::Disconnected) => { @@ -875,7 +869,7 @@ impl<T> Receiver<T> { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(RecvError), Err(oneshot::Upgraded(rx)) => rx, @@ -883,7 +877,7 @@ impl<T> Receiver<T> { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(RecvError), Err(stream::Upgraded(rx)) => rx, @@ -891,15 +885,13 @@ impl<T> Receiver<T> { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(None) } { + match p.recv(None) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(RecvError), Err(shared::Empty) => unreachable!(), } } - Flavor::Sync(ref p) => return unsafe { - (*p.get()).recv(None).map_err(|_| RecvError) - } + Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError), }; unsafe { mem::swap(self.inner_mut(), new_port.inner_mut()); @@ -952,7 +944,7 @@ impl<T> Receiver<T> { loop { let port_or_empty = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => Some(rx), @@ -960,7 +952,7 @@ impl<T> Receiver<T> { } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => Some(rx), @@ -968,14 +960,14 @@ impl<T> Receiver<T> { } } Flavor::Shared(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(shared::Disconnected) => return Err(Disconnected), Err(shared::Empty) => None, } } Flavor::Sync(ref p) => { - match unsafe { (*p.get()).recv(Some(deadline)) } { + match p.recv(Some(deadline)) { Ok(t) => return Ok(t), Err(sync::Disconnected) => return Err(Disconnected), Err(sync::Empty) => None, @@ -1020,23 +1012,19 @@ impl<T> select::Packet for Receiver<T> { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).can_recv() } { + match p.can_recv() { Ok(ret) => return ret, Err(upgrade) => upgrade, } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } + Flavor::Shared(ref p) => return p.can_recv(), + Flavor::Sync(ref p) => return p.can_recv(), }; unsafe { mem::swap(self.inner_mut(), @@ -1049,25 +1037,21 @@ impl<T> select::Packet for Receiver<T> { loop { let (t, new_port) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { oneshot::SelSuccess => return Installed, oneshot::SelCanceled => return Abort, oneshot::SelUpgraded(t, rx) => (t, rx), } } Flavor::Stream(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { + match p.start_selection(token) { stream::SelSuccess => return Installed, stream::SelCanceled => return Abort, stream::SelUpgraded(t, rx) => (t, rx), } } - Flavor::Shared(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } - Flavor::Sync(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } + Flavor::Shared(ref p) => return p.start_selection(token), + Flavor::Sync(ref p) => return p.start_selection(token), }; token = t; unsafe { @@ -1080,16 +1064,10 @@ impl<T> select::Packet for Receiver<T> { let mut was_upgrade = false; loop { let result = match *unsafe { self.inner() } { - Flavor::Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, - Flavor::Stream(ref p) => unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Flavor::Sync(ref p) => return unsafe { - (*p.get()).abort_selection() - }, + Flavor::Oneshot(ref p) => p.abort_selection(), + Flavor::Stream(ref p) => p.abort_selection(was_upgrade), + Flavor::Shared(ref p) => return p.abort_selection(was_upgrade), + Flavor::Sync(ref p) => return p.abort_selection(), }; let new_port = match result { Ok(b) => return b, Err(p) => p }; was_upgrade = true; @@ -1142,11 +1120,11 @@ impl <T> IntoIterator for Receiver<T> { #[stable(feature = "rust1", since = "1.0.0")] impl<T> Drop for Receiver<T> { fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Flavor::Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Flavor::Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, + match *unsafe { self.inner() } { + Flavor::Oneshot(ref p) => p.drop_port(), + Flavor::Stream(ref p) => p.drop_port(), + Flavor::Shared(ref p) => p.drop_port(), + Flavor::Sync(ref p) => p.drop_port(), } } } |
