diff options
| author | Jubilee Young <workingjubilee@gmail.com> | 2024-07-14 17:59:37 -0700 | 
|---|---|---|
| committer | Jubilee Young <workingjubilee@gmail.com> | 2024-07-14 17:59:37 -0700 | 
| commit | e32460276cb146b665c0c18b50bf6ea7764c693b (patch) | |
| tree | 3198ff32adb28b6e2e6992b9ecdf5db2eef19e5b /library/std/src/sync/mpmc | |
| parent | 64fb2366dacfcd187ce437e637e237ce39d9073c (diff) | |
| download | rust-e32460276cb146b665c0c18b50bf6ea7764c693b.tar.gz rust-e32460276cb146b665c0c18b50bf6ea7764c693b.zip | |
std: Unsafe-wrap std::sync
Diffstat (limited to 'library/std/src/sync/mpmc')
| -rw-r--r-- | library/std/src/sync/mpmc/array.rs | 22 | ||||
| -rw-r--r-- | library/std/src/sync/mpmc/counter.rs | 4 | ||||
| -rw-r--r-- | library/std/src/sync/mpmc/list.rs | 38 | ||||
| -rw-r--r-- | library/std/src/sync/mpmc/zero.rs | 20 | 
4 files changed, 48 insertions, 36 deletions
| diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index 492e21d9bdb..185319add74 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -200,11 +200,12 @@ impl<T> Channel<T> { return Err(msg); } - let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); - // Write the message into the slot and update the stamp. - slot.msg.get().write(MaybeUninit::new(msg)); - slot.stamp.store(token.array.stamp, Ordering::Release); + unsafe { + let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + slot.msg.get().write(MaybeUninit::new(msg)); + slot.stamp.store(token.array.stamp, Ordering::Release); + } // Wake a sleeping receiver. self.receivers.notify(); @@ -291,11 +292,14 @@ impl<T> Channel<T> { return Err(()); } - let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); - // Read the message from the slot and update the stamp. - let msg = slot.msg.get().read().assume_init(); - slot.stamp.store(token.array.stamp, Ordering::Release); + let msg = unsafe { + let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>); + + let msg = slot.msg.get().read().assume_init(); + slot.stamp.store(token.array.stamp, Ordering::Release); + msg + }; // Wake a sleeping sender. self.senders.notify(); @@ -471,7 +475,7 @@ impl<T> Channel<T> { false }; - self.discard_all_messages(tail); + unsafe { self.discard_all_messages(tail) }; disconnected } diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs index a5a6bdc67f1..3478cf41dc9 100644 --- a/library/std/src/sync/mpmc/counter.rs +++ b/library/std/src/sync/mpmc/counter.rs @@ -63,7 +63,7 @@ impl<C> Sender<C> { disconnect(&self.counter().chan); if self.counter().destroy.swap(true, Ordering::AcqRel) { - drop(Box::from_raw(self.counter)); + drop(unsafe { Box::from_raw(self.counter) }); } } } @@ -116,7 +116,7 @@ impl<C> Receiver<C> { disconnect(&self.counter().chan); if self.counter().destroy.swap(true, Ordering::AcqRel) { - drop(Box::from_raw(self.counter)); + drop(unsafe { Box::from_raw(self.counter) }); } } } diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs index 9e7148c716c..edac7a0cb18 100644 --- a/library/std/src/sync/mpmc/list.rs +++ b/library/std/src/sync/mpmc/list.rs @@ -91,7 +91,7 @@ impl<T> Block<T> { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. for i in start..BLOCK_CAP - 1 { - let slot = (*this).slots.get_unchecked(i); + let slot = unsafe { (*this).slots.get_unchecked(i) }; // Mark the `DESTROY` bit if a thread is still using the slot. if slot.state.load(Ordering::Acquire) & READ == 0 @@ -103,7 +103,7 @@ impl<T> Block<T> { } // No thread is using the block, now it is safe to destroy it. - drop(Box::from_raw(this)); + drop(unsafe { Box::from_raw(this) }); } } @@ -265,9 +265,11 @@ impl<T> Channel<T> { // Write the message into the slot. let block = token.list.block as *mut Block<T>; let offset = token.list.offset; - let slot = (*block).slots.get_unchecked(offset); - slot.msg.get().write(MaybeUninit::new(msg)); - slot.state.fetch_or(WRITE, Ordering::Release); + unsafe { + let slot = (*block).slots.get_unchecked(offset); + slot.msg.get().write(MaybeUninit::new(msg)); + slot.state.fetch_or(WRITE, Ordering::Release); + } // Wake a sleeping receiver. self.receivers.notify(); @@ -369,19 +371,21 @@ impl<T> Channel<T> { // Read the message. let block = token.list.block as *mut Block<T>; let offset = token.list.offset; - let slot = (*block).slots.get_unchecked(offset); - slot.wait_write(); - let msg = slot.msg.get().read().assume_init(); - - // Destroy the block if we've reached the end, or if another thread wanted to destroy but - // couldn't because we were busy reading from the slot. - if offset + 1 == BLOCK_CAP { - Block::destroy(block, 0); - } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { - Block::destroy(block, offset + 1); - } + unsafe { + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let msg = slot.msg.get().read().assume_init(); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy but + // couldn't because we were busy reading from the slot. + if offset + 1 == BLOCK_CAP { + Block::destroy(block, 0); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset + 1); + } - Ok(msg) + Ok(msg) + } } /// Attempts to send a message into the channel. diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs index 1b82713edc7..6d1c9d64e7a 100644 --- a/library/std/src/sync/mpmc/zero.rs +++ b/library/std/src/sync/mpmc/zero.rs @@ -103,9 +103,11 @@ impl<T> Channel<T> { return Err(msg); } - let packet = &*(token.zero.0 as *const Packet<T>); - packet.msg.get().write(Some(msg)); - packet.ready.store(true, Ordering::Release); + unsafe { + let packet = &*(token.zero.0 as *const Packet<T>); + packet.msg.get().write(Some(msg)); + packet.ready.store(true, Ordering::Release); + } Ok(()) } @@ -116,22 +118,24 @@ impl<T> Channel<T> { return Err(()); } - let packet = &*(token.zero.0 as *const Packet<T>); + let packet = unsafe { &*(token.zero.0 as *const Packet<T>) }; if packet.on_stack { // The message has been in the packet from the beginning, so there is no need to wait // for it. However, after reading the message, we need to set `ready` to `true` in // order to signal that the packet can be destroyed. - let msg = packet.msg.get().replace(None).unwrap(); + let msg = unsafe { packet.msg.get().replace(None) }.unwrap(); packet.ready.store(true, Ordering::Release); Ok(msg) } else { // Wait until the message becomes available, then read it and destroy the // heap-allocated packet. packet.wait_ready(); - let msg = packet.msg.get().replace(None).unwrap(); - drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); - Ok(msg) + unsafe { + let msg = packet.msg.get().replace(None).unwrap(); + drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); + Ok(msg) + } } } | 
