diff options
Diffstat (limited to 'library/std/src/sync/mpsc/shared.rs')
| -rw-r--r-- | library/std/src/sync/mpsc/shared.rs | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs index 56162655544..51917bd96bd 100644 --- a/library/std/src/sync/mpsc/shared.rs +++ b/library/std/src/sync/mpsc/shared.rs @@ -15,7 +15,7 @@ use core::intrinsics::abort; use crate::cell::UnsafeCell; use crate::ptr; -use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::mpsc_queue as mpsc; use crate::sync::{Mutex, MutexGuard}; @@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize; const MAX_STEALS: isize = 5; #[cfg(not(test))] const MAX_STEALS: isize = 1 << 20; +const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver pub struct Packet<T> { queue: mpsc::Queue<T>, cnt: AtomicIsize, // How many items are on this channel steals: UnsafeCell<isize>, // How many times has a port received without blocking? - to_wake: AtomicUsize, // SignalToken for wake up + to_wake: AtomicPtr<u8>, // SignalToken for wake up // The number of channels which are currently using this packet. channels: AtomicUsize, @@ -68,7 +69,7 @@ impl<T> Packet<T> { queue: mpsc::Queue::new(), cnt: AtomicIsize::new(0), steals: UnsafeCell::new(0), - to_wake: AtomicUsize::new(0), + to_wake: AtomicPtr::new(EMPTY), channels: AtomicUsize::new(2), port_dropped: AtomicBool::new(false), sender_drain: AtomicIsize::new(0), @@ -93,8 +94,8 @@ impl<T> Packet<T> { pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) { if let Some(token) = token { assert_eq!(self.cnt.load(Ordering::SeqCst), 0); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); - self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); + self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst); self.cnt.store(-1, Ordering::SeqCst); // This store is a little sketchy. What's happening here is that @@ -250,10 +251,10 @@ impl<T> Packet<T> { unsafe { assert_eq!( self.to_wake.load(Ordering::SeqCst), - 0, + EMPTY, "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364" ); - let ptr = token.cast_to_usize(); + let ptr = token.to_raw(); self.to_wake.store(ptr, Ordering::SeqCst); let steals = ptr::replace(self.steals.get(), 0); @@ -272,8 +273,8 @@ impl<T> Packet<T> { } } - self.to_wake.store(0, Ordering::SeqCst); - drop(SignalToken::cast_from_usize(ptr)); + self.to_wake.store(EMPTY, Ordering::SeqCst); + drop(SignalToken::from_raw(ptr)); Abort } } @@ -415,9 +416,9 @@ impl<T> Packet<T> { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(0, Ordering::SeqCst); - assert!(ptr != 0); - unsafe { SignalToken::cast_from_usize(ptr) } + self.to_wake.store(EMPTY, Ordering::SeqCst); + assert!(ptr != EMPTY); + unsafe { SignalToken::from_raw(ptr) } } //////////////////////////////////////////////////////////////////////////// @@ -462,7 +463,7 @@ impl<T> Packet<T> { let prev = self.bump(steals + 1); if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); true } else { let cur = prev + steals + 1; @@ -470,7 +471,7 @@ impl<T> Packet<T> { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(Ordering::SeqCst) != 0 { + while self.to_wake.load(Ordering::SeqCst) != EMPTY { thread::yield_now(); } } @@ -494,7 +495,7 @@ impl<T> Drop for Packet<T> { // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); assert_eq!(self.channels.load(Ordering::SeqCst), 0); } } |
