about summary refs log tree commit diff
path: root/library/std/src/sync/mpmc
diff options
context:
space:
mode:
authorJubilee Young <workingjubilee@gmail.com>2024-07-14 17:59:37 -0700
committerJubilee Young <workingjubilee@gmail.com>2024-07-14 17:59:37 -0700
commite32460276cb146b665c0c18b50bf6ea7764c693b (patch)
tree3198ff32adb28b6e2e6992b9ecdf5db2eef19e5b /library/std/src/sync/mpmc
parent64fb2366dacfcd187ce437e637e237ce39d9073c (diff)
downloadrust-e32460276cb146b665c0c18b50bf6ea7764c693b.tar.gz
rust-e32460276cb146b665c0c18b50bf6ea7764c693b.zip
std: Unsafe-wrap std::sync
Diffstat (limited to 'library/std/src/sync/mpmc')
-rw-r--r--library/std/src/sync/mpmc/array.rs22
-rw-r--r--library/std/src/sync/mpmc/counter.rs4
-rw-r--r--library/std/src/sync/mpmc/list.rs38
-rw-r--r--library/std/src/sync/mpmc/zero.rs20
4 files changed, 48 insertions, 36 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
index 492e21d9bdb..185319add74 100644
--- a/library/std/src/sync/mpmc/array.rs
+++ b/library/std/src/sync/mpmc/array.rs
@@ -200,11 +200,12 @@ impl<T> Channel<T> {
             return Err(msg);
         }
 
-        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
-
         // Write the message into the slot and update the stamp.
-        slot.msg.get().write(MaybeUninit::new(msg));
-        slot.stamp.store(token.array.stamp, Ordering::Release);
+        unsafe {
+            let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+            slot.msg.get().write(MaybeUninit::new(msg));
+            slot.stamp.store(token.array.stamp, Ordering::Release);
+        }
 
         // Wake a sleeping receiver.
         self.receivers.notify();
@@ -291,11 +292,14 @@ impl<T> Channel<T> {
             return Err(());
         }
 
-        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
-
         // Read the message from the slot and update the stamp.
-        let msg = slot.msg.get().read().assume_init();
-        slot.stamp.store(token.array.stamp, Ordering::Release);
+        let msg = unsafe {
+            let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+            let msg = slot.msg.get().read().assume_init();
+            slot.stamp.store(token.array.stamp, Ordering::Release);
+            msg
+        };
 
         // Wake a sleeping sender.
         self.senders.notify();
@@ -471,7 +475,7 @@ impl<T> Channel<T> {
             false
         };
 
-        self.discard_all_messages(tail);
+        unsafe { self.discard_all_messages(tail) };
         disconnected
     }
 
diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs
index a5a6bdc67f1..3478cf41dc9 100644
--- a/library/std/src/sync/mpmc/counter.rs
+++ b/library/std/src/sync/mpmc/counter.rs
@@ -63,7 +63,7 @@ impl<C> Sender<C> {
             disconnect(&self.counter().chan);
 
             if self.counter().destroy.swap(true, Ordering::AcqRel) {
-                drop(Box::from_raw(self.counter));
+                drop(unsafe { Box::from_raw(self.counter) });
             }
         }
     }
@@ -116,7 +116,7 @@ impl<C> Receiver<C> {
             disconnect(&self.counter().chan);
 
             if self.counter().destroy.swap(true, Ordering::AcqRel) {
-                drop(Box::from_raw(self.counter));
+                drop(unsafe { Box::from_raw(self.counter) });
             }
         }
     }
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
index 9e7148c716c..edac7a0cb18 100644
--- a/library/std/src/sync/mpmc/list.rs
+++ b/library/std/src/sync/mpmc/list.rs
@@ -91,7 +91,7 @@ impl<T> Block<T> {
         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
         // begun destruction of the block.
         for i in start..BLOCK_CAP - 1 {
-            let slot = (*this).slots.get_unchecked(i);
+            let slot = unsafe { (*this).slots.get_unchecked(i) };
 
             // Mark the `DESTROY` bit if a thread is still using the slot.
             if slot.state.load(Ordering::Acquire) & READ == 0
@@ -103,7 +103,7 @@ impl<T> Block<T> {
         }
 
         // No thread is using the block, now it is safe to destroy it.
-        drop(Box::from_raw(this));
+        drop(unsafe { Box::from_raw(this) });
     }
 }
 
@@ -265,9 +265,11 @@ impl<T> Channel<T> {
         // Write the message into the slot.
         let block = token.list.block as *mut Block<T>;
         let offset = token.list.offset;
-        let slot = (*block).slots.get_unchecked(offset);
-        slot.msg.get().write(MaybeUninit::new(msg));
-        slot.state.fetch_or(WRITE, Ordering::Release);
+        unsafe {
+            let slot = (*block).slots.get_unchecked(offset);
+            slot.msg.get().write(MaybeUninit::new(msg));
+            slot.state.fetch_or(WRITE, Ordering::Release);
+        }
 
         // Wake a sleeping receiver.
         self.receivers.notify();
@@ -369,19 +371,21 @@ impl<T> Channel<T> {
         // Read the message.
         let block = token.list.block as *mut Block<T>;
         let offset = token.list.offset;
-        let slot = (*block).slots.get_unchecked(offset);
-        slot.wait_write();
-        let msg = slot.msg.get().read().assume_init();
-
-        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
-        // couldn't because we were busy reading from the slot.
-        if offset + 1 == BLOCK_CAP {
-            Block::destroy(block, 0);
-        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
-            Block::destroy(block, offset + 1);
-        }
+        unsafe {
+            let slot = (*block).slots.get_unchecked(offset);
+            slot.wait_write();
+            let msg = slot.msg.get().read().assume_init();
+
+            // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+            // couldn't because we were busy reading from the slot.
+            if offset + 1 == BLOCK_CAP {
+                Block::destroy(block, 0);
+            } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+                Block::destroy(block, offset + 1);
+            }
 
-        Ok(msg)
+            Ok(msg)
+        }
     }
 
     /// Attempts to send a message into the channel.
diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs
index 1b82713edc7..6d1c9d64e7a 100644
--- a/library/std/src/sync/mpmc/zero.rs
+++ b/library/std/src/sync/mpmc/zero.rs
@@ -103,9 +103,11 @@ impl<T> Channel<T> {
             return Err(msg);
         }
 
-        let packet = &*(token.zero.0 as *const Packet<T>);
-        packet.msg.get().write(Some(msg));
-        packet.ready.store(true, Ordering::Release);
+        unsafe {
+            let packet = &*(token.zero.0 as *const Packet<T>);
+            packet.msg.get().write(Some(msg));
+            packet.ready.store(true, Ordering::Release);
+        }
         Ok(())
     }
 
@@ -116,22 +118,24 @@ impl<T> Channel<T> {
             return Err(());
         }
 
-        let packet = &*(token.zero.0 as *const Packet<T>);
+        let packet = unsafe { &*(token.zero.0 as *const Packet<T>) };
 
         if packet.on_stack {
             // The message has been in the packet from the beginning, so there is no need to wait
             // for it. However, after reading the message, we need to set `ready` to `true` in
             // order to signal that the packet can be destroyed.
-            let msg = packet.msg.get().replace(None).unwrap();
+            let msg = unsafe { packet.msg.get().replace(None) }.unwrap();
             packet.ready.store(true, Ordering::Release);
             Ok(msg)
         } else {
             // Wait until the message becomes available, then read it and destroy the
             // heap-allocated packet.
             packet.wait_ready();
-            let msg = packet.msg.get().replace(None).unwrap();
-            drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
-            Ok(msg)
+            unsafe {
+                let msg = packet.msg.get().replace(None).unwrap();
+                drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
+                Ok(msg)
+            }
         }
     }