about summary refs log tree commit diff
path: root/library/std/src/sync/mpsc/shared.rs
diff options
context:
space:
mode:
Diffstat (limited to 'library/std/src/sync/mpsc/shared.rs')
-rw-r--r--library/std/src/sync/mpsc/shared.rs31
1 files changed, 16 insertions, 15 deletions
diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs
index 56162655544..51917bd96bd 100644
--- a/library/std/src/sync/mpsc/shared.rs
+++ b/library/std/src/sync/mpsc/shared.rs
@@ -15,7 +15,7 @@ use core::intrinsics::abort;
 
 use crate::cell::UnsafeCell;
 use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
 use crate::sync::mpsc::blocking::{self, SignalToken};
 use crate::sync::mpsc::mpsc_queue as mpsc;
 use crate::sync::{Mutex, MutexGuard};
@@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize;
 const MAX_STEALS: isize = 5;
 #[cfg(not(test))]
 const MAX_STEALS: isize = 1 << 20;
+const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
 
 pub struct Packet<T> {
     queue: mpsc::Queue<T>,
     cnt: AtomicIsize,          // How many items are on this channel
     steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-    to_wake: AtomicUsize,      // SignalToken for wake up
+    to_wake: AtomicPtr<u8>,    // SignalToken for wake up
 
     // The number of channels which are currently using this packet.
     channels: AtomicUsize,
@@ -68,7 +69,7 @@ impl<T> Packet<T> {
             queue: mpsc::Queue::new(),
             cnt: AtomicIsize::new(0),
             steals: UnsafeCell::new(0),
-            to_wake: AtomicUsize::new(0),
+            to_wake: AtomicPtr::new(EMPTY),
             channels: AtomicUsize::new(2),
             port_dropped: AtomicBool::new(false),
             sender_drain: AtomicIsize::new(0),
@@ -93,8 +94,8 @@ impl<T> Packet<T> {
     pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
         if let Some(token) = token {
             assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
-            self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
+            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
+            self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
             self.cnt.store(-1, Ordering::SeqCst);
 
             // This store is a little sketchy. What's happening here is that
@@ -250,10 +251,10 @@ impl<T> Packet<T> {
         unsafe {
             assert_eq!(
                 self.to_wake.load(Ordering::SeqCst),
-                0,
+                EMPTY,
                 "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
             );
-            let ptr = token.cast_to_usize();
+            let ptr = token.to_raw();
             self.to_wake.store(ptr, Ordering::SeqCst);
 
             let steals = ptr::replace(self.steals.get(), 0);
@@ -272,8 +273,8 @@ impl<T> Packet<T> {
                 }
             }
 
-            self.to_wake.store(0, Ordering::SeqCst);
-            drop(SignalToken::cast_from_usize(ptr));
+            self.to_wake.store(EMPTY, Ordering::SeqCst);
+            drop(SignalToken::from_raw(ptr));
             Abort
         }
     }
@@ -415,9 +416,9 @@ impl<T> Packet<T> {
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
         let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(0, Ordering::SeqCst);
-        assert!(ptr != 0);
-        unsafe { SignalToken::cast_from_usize(ptr) }
+        self.to_wake.store(EMPTY, Ordering::SeqCst);
+        assert!(ptr != EMPTY);
+        unsafe { SignalToken::from_raw(ptr) }
     }
 
     ////////////////////////////////////////////////////////////////////////////
@@ -462,7 +463,7 @@ impl<T> Packet<T> {
         let prev = self.bump(steals + 1);
 
         if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
             true
         } else {
             let cur = prev + steals + 1;
@@ -470,7 +471,7 @@ impl<T> Packet<T> {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.to_wake.load(Ordering::SeqCst) != 0 {
+                while self.to_wake.load(Ordering::SeqCst) != EMPTY {
                     thread::yield_now();
                 }
             }
@@ -494,7 +495,7 @@ impl<T> Drop for Packet<T> {
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
         assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
         assert_eq!(self.channels.load(Ordering::SeqCst), 0);
     }
 }