diff options
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/atomic.rs | 227 | ||||
| -rw-r--r-- | src/libstd/sync/condvar.rs | 6 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mpsc_queue.rs | 12 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/oneshot.rs | 34 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/shared.rs | 95 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/spsc_queue.rs | 41 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/stream.rs | 64 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/sync.rs | 13 | ||||
| -rw-r--r-- | src/libstd/sync/once.rs | 24 |
11 files changed, 147 insertions, 375 deletions
diff --git a/src/libstd/sync/atomic.rs b/src/libstd/sync/atomic.rs deleted file mode 100644 index 3652b45ce97..00000000000 --- a/src/libstd/sync/atomic.rs +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomic types -//! -//! Atomic types provide primitive shared-memory communication between -//! threads, and are the building blocks of other concurrent -//! types. -//! -//! This module defines atomic versions of a select number of primitive -//! types, including `AtomicBool`, `AtomicInt`, `AtomicUint`, and `AtomicOption`. -//! Atomic types present operations that, when used correctly, synchronize -//! updates between threads. -//! -//! Each method takes an `Ordering` which represents the strength of -//! the memory barrier for that operation. These orderings are the -//! same as [C++11 atomic orderings][1]. -//! -//! [1]: http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync -//! -//! Atomic variables are safe to share between threads (they implement `Sync`) -//! but they do not themselves provide the mechanism for sharing. The most -//! common way to share an atomic variable is to put it into an `Arc` (an -//! atomically-reference-counted shared pointer). -//! -//! Most atomic types may be stored in static variables, initialized using -//! the provided static initializers like `INIT_ATOMIC_BOOL`. Atomic statics -//! are often used for lazy global initialization. -//! -//! -//! # Examples -//! -//! A simple spinlock: -//! -//! ``` -//! use std::sync::Arc; -//! use std::sync::atomic::{AtomicUint, SeqCst}; -//! use std::thread::Thread; -//! -//! fn main() { -//! let spinlock = Arc::new(AtomicUint::new(1)); -//! -//! let spinlock_clone = spinlock.clone(); -//! Thread::spawn(move|| { -//! spinlock_clone.store(0, SeqCst); -//! }).detach(); -//! -//! // Wait for the other task to release the lock -//! while spinlock.load(SeqCst) != 0 {} -//! } -//! ``` -//! -//! Transferring a heap object with `AtomicOption`: -//! -//! ``` -//! use std::sync::Arc; -//! use std::sync::atomic::{AtomicOption, SeqCst}; -//! use std::thread::Thread; -//! -//! fn main() { -//! struct BigObject; -//! -//! let shared_big_object = Arc::new(AtomicOption::empty()); -//! -//! let shared_big_object_clone = shared_big_object.clone(); -//! Thread::spawn(move|| { -//! let unwrapped_big_object = shared_big_object_clone.take(SeqCst); -//! if unwrapped_big_object.is_some() { -//! println!("got a big object from another task"); -//! } else { -//! println!("other task hasn't sent big object yet"); -//! } -//! }).detach(); -//! -//! shared_big_object.swap(box BigObject, SeqCst); -//! } -//! ``` -//! -//! Keep a global count of live tasks: -//! -//! ``` -//! use std::sync::atomic::{AtomicUint, SeqCst, ATOMIC_UINT_INIT}; -//! -//! static GLOBAL_TASK_COUNT: AtomicUint = ATOMIC_UINT_INIT; -//! -//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); -//! println!("live tasks: {}", old_task_count + 1); -//! ``` - -#![stable] - -use alloc::boxed::Box; -use core::mem; -use core::prelude::{Send, Drop, None, Option, Some}; - -pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; -pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; -pub use core::atomic::{ATOMIC_BOOL_INIT, ATOMIC_INT_INIT, ATOMIC_UINT_INIT}; -pub use core::atomic::fence; -pub use core::atomic::Ordering::{self, Relaxed, Release, Acquire, AcqRel, SeqCst}; - -/// An atomic, nullable unique pointer -/// -/// This can be used as the concurrency primitive for operations that transfer -/// owned heap objects across tasks. -#[unsafe_no_drop_flag] -#[deprecated = "no longer used; will eventually be replaced by a higher-level\ - concept like MVar"] -pub struct AtomicOption<T> { - p: AtomicUint, -} - -#[allow(deprecated)] -impl<T: Send> AtomicOption<T> { - /// Create a new `AtomicOption` - pub fn new(p: Box<T>) -> AtomicOption<T> { - unsafe { AtomicOption { p: AtomicUint::new(mem::transmute(p)) } } - } - - /// Create a new `AtomicOption` that doesn't contain a value - pub fn empty() -> AtomicOption<T> { AtomicOption { p: AtomicUint::new(0) } } - - /// Store a value, returning the old value - #[inline] - pub fn swap(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - let val = unsafe { mem::transmute(val) }; - - match self.p.swap(val, order) { - 0 => None, - n => Some(unsafe { mem::transmute(n) }), - } - } - - /// Remove the value, leaving the `AtomicOption` empty. - #[inline] - pub fn take(&self, order: Ordering) -> Option<Box<T>> { - unsafe { self.swap(mem::transmute(0u), order) } - } - - /// Replace an empty value with a non-empty value. - /// - /// Succeeds if the option is `None` and returns `None` if so. If - /// the option was already `Some`, returns `Some` of the rejected - /// value. - #[inline] - pub fn fill(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - unsafe { - let val = mem::transmute(val); - let expected = mem::transmute(0u); - let oldval = self.p.compare_and_swap(expected, val, order); - if oldval == expected { - None - } else { - Some(mem::transmute(val)) - } - } - } - - /// Returns `true` if the `AtomicOption` is empty. - /// - /// Be careful: The caller must have some external method of ensuring the - /// result does not get invalidated by another task after this returns. - #[inline] - pub fn is_empty(&self, order: Ordering) -> bool { - self.p.load(order) as uint == 0 - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for AtomicOption<T> { - fn drop(&mut self) { - let _ = self.take(SeqCst); - } -} - -#[cfg(test)] -mod test { - use prelude::v1::*; - use super::*; - - #[test] - fn option_empty() { - let option: AtomicOption<()> = AtomicOption::empty(); - assert!(option.is_empty(SeqCst)); - } - - #[test] - fn option_swap() { - let p = AtomicOption::new(box 1i); - let a = box 2i; - - let b = p.swap(a, SeqCst); - - assert!(b == Some(box 1)); - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_take() { - let p = AtomicOption::new(box 1i); - - assert!(p.take(SeqCst) == Some(box 1)); - assert!(p.take(SeqCst) == None); - - let p2 = box 2i; - p.swap(p2, SeqCst); - - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_fill() { - let p = AtomicOption::new(box 1i); - assert!(p.fill(box 2i, SeqCst).is_some()); // should fail; shouldn't leak! - assert!(p.take(SeqCst) == Some(box 1)); - - assert!(p.fill(box 2i, SeqCst).is_none()); // shouldn't fail - assert!(p.take(SeqCst) == Some(box 2)); - } -} diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index 28c36922ca6..7734f655ed2 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -10,7 +10,7 @@ use prelude::v1::*; -use sync::atomic::{self, AtomicUint}; +use sync::atomic::{AtomicUint, Ordering, ATOMIC_UINT_INIT}; use sync::poison::{self, LockResult}; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; @@ -88,7 +88,7 @@ unsafe impl Sync for StaticCondvar {} #[unstable = "may be merged with Condvar in the future"] pub const CONDVAR_INIT: StaticCondvar = StaticCondvar { inner: sys::CONDVAR_INIT, - mutex: atomic::ATOMIC_UINT_INIT, + mutex: ATOMIC_UINT_INIT, }; impl Condvar { @@ -260,7 +260,7 @@ impl StaticCondvar { fn verify(&self, mutex: &sys_mutex::Mutex) { let addr = mutex as *const _ as uint; - match self.mutex.compare_and_swap(0, addr, atomic::SeqCst) { + match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) { // If we got out 0, then we have successfully bound the mutex to // this cvar. 0 => {} diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs index e5245251ea8..4c6adcc04f6 100644 --- a/src/libstd/sync/future.rs +++ b/src/libstd/sync/future.rs @@ -65,10 +65,6 @@ impl<A> Future<A> { } } - /// Deprecated, use into_inner() instead - #[deprecated = "renamed to into_inner()"] - pub fn unwrap(self) -> A { self.into_inner() } - pub fn get_ref<'a>(&'a mut self) -> &'a A { /*! * Executes the future's closure and then returns a reference diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index c09c3b45d3e..6ce278726e9 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -18,6 +18,7 @@ #![experimental] pub use alloc::arc::{Arc, Weak}; +pub use core::atomic; pub use self::mutex::{Mutex, MutexGuard, StaticMutex}; pub use self::mutex::MUTEX_INIT; @@ -32,7 +33,6 @@ pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; pub use self::future::Future; pub use self::task_pool::TaskPool; -pub mod atomic; pub mod mpsc; mod barrier; diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs index 8945233dac9..8f85dc6e043 100644 --- a/src/libstd/sync/mpsc/mpsc_queue.rs +++ b/src/libstd/sync/mpsc/mpsc_queue.rs @@ -48,7 +48,7 @@ use alloc::boxed::Box; use core::mem; use core::cell::UnsafeCell; -use sync::atomic::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use sync::atomic::{AtomicPtr, Ordering}; /// A result of the `pop` function. pub enum PopResult<T> { @@ -103,8 +103,8 @@ impl<T: Send> Queue<T> { pub fn push(&self, t: T) { unsafe { let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); } } @@ -121,7 +121,7 @@ impl<T: Send> Queue<T> { pub fn pop(&self) -> PopResult<T> { unsafe { let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); + let next = (*tail).next.load(Ordering::Acquire); if !next.is_null() { *self.tail.get() = next; @@ -132,7 +132,7 @@ impl<T: Send> Queue<T> { return Data(ret); } - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} } } } @@ -143,7 +143,7 @@ impl<T: Send> Drop for Queue<T> { unsafe { let mut cur = *self.tail.get(); while !cur.is_null() { - let next = (*cur).next.load(Relaxed); + let next = (*cur).next.load(Ordering::Relaxed); let _: Box<Node<T>> = mem::transmute(cur); cur = next; } diff --git a/src/libstd/sync/mpsc/oneshot.rs b/src/libstd/sync/mpsc/oneshot.rs index 2811f403c6c..5c2331d0f2e 100644 --- a/src/libstd/sync/mpsc/oneshot.rs +++ b/src/libstd/sync/mpsc/oneshot.rs @@ -42,7 +42,7 @@ use core::prelude::*; use sync::mpsc::Receiver; use sync::mpsc::blocking::{self, SignalToken}; use core::mem; -use sync::atomic; +use sync::atomic::{AtomicUint, Ordering}; // Various states you can find a port in. const EMPTY: uint = 0; // initial state: no data, no blocked reciever @@ -56,7 +56,7 @@ const DISCONNECTED: uint = 2; // channel is disconnected OR upgraded pub struct Packet<T> { // Internal state of the chan/port pair (stores the blocked task as well) - state: atomic::AtomicUint, + state: AtomicUint, // One-shot data slot location data: Option<T>, // when used for the second time, a oneshot channel must be upgraded, and @@ -93,7 +93,7 @@ impl<T: Send> Packet<T> { Packet { data: None, upgrade: NothingSent, - state: atomic::AtomicUint::new(EMPTY), + state: AtomicUint::new(EMPTY), } } @@ -107,7 +107,7 @@ impl<T: Send> Packet<T> { self.data = Some(t); self.upgrade = SendUsed; - match self.state.swap(DATA, atomic::SeqCst) { + match self.state.swap(DATA, Ordering::SeqCst) { // Sent the data, no one was waiting EMPTY => Ok(()), @@ -141,14 +141,14 @@ impl<T: Send> Packet<T> { pub fn recv(&mut self) -> Result<T, Failure<T>> { // Attempt to not block the task (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. - if self.state.load(atomic::SeqCst) == EMPTY { + if self.state.load(Ordering::SeqCst) == EMPTY { let (wait_token, signal_token) = blocking::tokens(); let ptr = unsafe { signal_token.cast_to_uint() }; // race with senders to enter the blocking state - if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY { + if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY { wait_token.wait(); - debug_assert!(self.state.load(atomic::SeqCst) != EMPTY); + debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); } else { // drop the signal token, since we never blocked drop(unsafe { SignalToken::cast_from_uint(ptr) }); @@ -159,7 +159,7 @@ impl<T: Send> Packet<T> { } pub fn try_recv(&mut self) -> Result<T, Failure<T>> { - match self.state.load(atomic::SeqCst) { + match self.state.load(Ordering::SeqCst) { EMPTY => Err(Empty), // We saw some data on the channel, but the channel can be used @@ -169,7 +169,7 @@ impl<T: Send> Packet<T> { // the state changes under our feet we'd rather just see that state // change. DATA => { - self.state.compare_and_swap(DATA, EMPTY, atomic::SeqCst); + self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); match self.data.take() { Some(data) => Ok(data), None => unreachable!(), @@ -209,7 +209,7 @@ impl<T: Send> Packet<T> { }; self.upgrade = GoUp(up); - match self.state.swap(DISCONNECTED, atomic::SeqCst) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // If the channel is empty or has data on it, then we're good to go. // Senders will check the data before the upgrade (in case we // plastered over the DATA state). @@ -225,7 +225,7 @@ impl<T: Send> Packet<T> { } pub fn drop_chan(&mut self) { - match self.state.swap(DISCONNECTED, atomic::SeqCst) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { DATA | DISCONNECTED | EMPTY => {} // If someone's waiting, we gotta wake them up @@ -236,7 +236,7 @@ impl<T: Send> Packet<T> { } pub fn drop_port(&mut self) { - match self.state.swap(DISCONNECTED, atomic::SeqCst) { + match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop // glue @@ -259,7 +259,7 @@ impl<T: Send> Packet<T> { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { - match self.state.load(atomic::SeqCst) { + match self.state.load(Ordering::SeqCst) { EMPTY => Ok(false), // Welp, we tried DATA => Ok(true), // we have some un-acquired data DISCONNECTED if self.data.is_some() => Ok(true), // we have data @@ -284,7 +284,7 @@ impl<T: Send> Packet<T> { // because there is data, or fail because there is an upgrade pending. pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> { let ptr = unsafe { token.cast_to_uint() }; - match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) { + match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { EMPTY => SelSuccess, DATA => { drop(unsafe { SignalToken::cast_from_uint(ptr) }); @@ -322,7 +322,7 @@ impl<T: Send> Packet<T> { // // The return value indicates whether there's data on this port. pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { - let state = match self.state.load(atomic::SeqCst) { + let state = match self.state.load(Ordering::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection s @ EMPTY | @@ -331,7 +331,7 @@ impl<T: Send> Packet<T> { // If we've got a blocked task, then use an atomic to gain ownership // of it (may fail) - ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst) + ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst) }; // Now that we've got ownership of our state, figure out what to do @@ -370,6 +370,6 @@ impl<T: Send> Packet<T> { #[unsafe_destructor] impl<T: Send> Drop for Packet<T> { fn drop(&mut self) { - assert_eq!(self.state.load(atomic::SeqCst), DISCONNECTED); + assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); } } diff --git a/src/libstd/sync/mpsc/shared.rs b/src/libstd/sync/mpsc/shared.rs index cadac8e6272..4295d116aed 100644 --- a/src/libstd/sync/mpsc/shared.rs +++ b/src/libstd/sync/mpsc/shared.rs @@ -25,11 +25,12 @@ use core::prelude::*; use core::cmp; use core::int; -use sync::{atomic, Mutex, MutexGuard}; -use sync::mpsc::mpsc_queue as mpsc; +use sync::atomic::{AtomicUint, AtomicInt, AtomicBool, Ordering}; use sync::mpsc::blocking::{self, SignalToken}; -use sync::mpsc::select::StartResult; +use sync::mpsc::mpsc_queue as mpsc; use sync::mpsc::select::StartResult::*; +use sync::mpsc::select::StartResult; +use sync::{Mutex, MutexGuard}; use thread::Thread; const DISCONNECTED: int = int::MIN; @@ -41,17 +42,17 @@ const MAX_STEALS: int = 1 << 20; pub struct Packet<T> { queue: mpsc::Queue<T>, - cnt: atomic::AtomicInt, // How many items are on this channel + cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? - to_wake: atomic::AtomicUint, // SignalToken for wake up + to_wake: AtomicUint, // SignalToken for wake up // The number of channels which are currently using this packet. - channels: atomic::AtomicInt, + channels: AtomicInt, // See the discussion in Port::drop and the channel send methods for what // these are used for - port_dropped: atomic::AtomicBool, - sender_drain: atomic::AtomicInt, + port_dropped: AtomicBool, + sender_drain: AtomicInt, // this lock protects various portions of this implementation during // select() @@ -69,12 +70,12 @@ impl<T: Send> Packet<T> { pub fn new() -> Packet<T> { let p = Packet { queue: mpsc::Queue::new(), - cnt: atomic::AtomicInt::new(0), + cnt: AtomicInt::new(0), steals: 0, - to_wake: atomic::AtomicUint::new(0), - channels: atomic::AtomicInt::new(2), - port_dropped: atomic::AtomicBool::new(false), - sender_drain: atomic::AtomicInt::new(0), + to_wake: AtomicUint::new(0), + channels: AtomicInt::new(2), + port_dropped: AtomicBool::new(false), + sender_drain: AtomicInt::new(0), select_lock: Mutex::new(()), }; return p; @@ -98,10 +99,10 @@ impl<T: Send> Packet<T> { token: Option<SignalToken>, guard: MutexGuard<()>) { token.map(|token| { - assert_eq!(self.cnt.load(atomic::SeqCst), 0); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst); - self.cnt.store(-1, atomic::SeqCst); + assert_eq!(self.cnt.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + self.to_wake.store(unsafe { token.cast_to_uint() }, Ordering::SeqCst); + self.cnt.store(-1, Ordering::SeqCst); // This store is a little sketchy. What's happening here is that // we're transferring a blocker from a oneshot or stream channel to @@ -134,7 +135,7 @@ impl<T: Send> Packet<T> { pub fn send(&mut self, t: T) -> Result<(), T> { // See Port::drop for what's going on - if self.port_dropped.load(atomic::SeqCst) { return Err(t) } + if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } // Note that the multiple sender case is a little trickier // semantically than the single sender case. The logic for @@ -161,12 +162,12 @@ impl<T: Send> Packet<T> { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE { + if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { return Err(t) } self.queue.push(t); - match self.cnt.fetch_add(1, atomic::SeqCst) { + match self.cnt.fetch_add(1, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } @@ -183,9 +184,9 @@ impl<T: Send> Packet<T> { n if n < DISCONNECTED + FUDGE => { // see the comment in 'try' for a shared channel for why this // window of "not disconnected" is ok. - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); - if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 { + if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { loop { // drain the queue, for info on the thread yield see the // discussion in try_recv @@ -198,7 +199,7 @@ impl<T: Send> Packet<T> { } // maybe we're done, if we're not the last ones // here, then we need to go try again. - if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 { + if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { break } } @@ -239,15 +240,15 @@ impl<T: Send> Packet<T> { // Essentially the exact same thing as the stream decrement function. // Returns true if blocking should proceed. fn decrement(&mut self, token: SignalToken) -> StartResult { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_uint() }; - self.to_wake.store(ptr, atomic::SeqCst); + self.to_wake.store(ptr, Ordering::SeqCst); let steals = self.steals; self.steals = 0; - match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { @@ -256,7 +257,7 @@ impl<T: Send> Packet<T> { } } - self.to_wake.store(0, atomic::SeqCst); + self.to_wake.store(0, Ordering::SeqCst); drop(unsafe { SignalToken::cast_from_uint(ptr) }); Abort } @@ -297,9 +298,9 @@ impl<T: Send> Packet<T> { // might decrement steals. Some(data) => { if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomic::SeqCst) { + match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { let m = cmp::min(n, self.steals); @@ -316,7 +317,7 @@ impl<T: Send> Packet<T> { // See the discussion in the stream implementation for why we try // again. None => { - match self.cnt.load(atomic::SeqCst) { + match self.cnt.load(Ordering::SeqCst) { n if n != DISCONNECTED => Err(Empty), _ => { match self.queue.pop() { @@ -334,20 +335,20 @@ impl<T: Send> Packet<T> { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. pub fn clone_chan(&mut self) { - self.channels.fetch_add(1, atomic::SeqCst); + self.channels.fetch_add(1, Ordering::SeqCst); } // Decrement the reference count on a channel. This is called whenever a // Chan is dropped and may end up waking up a receiver. It's the receiver's // responsibility on the other end to figure out that we've disconnected. pub fn drop_chan(&mut self) { - match self.channels.fetch_sub(1, atomic::SeqCst) { + match self.channels.fetch_sub(1, Ordering::SeqCst) { 1 => {} n if n > 1 => return, n => panic!("bad number of channels left {}", n), } - match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { + match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } DISCONNECTED => {} n => { assert!(n >= 0); } @@ -357,10 +358,10 @@ impl<T: Send> Packet<T> { // See the long discussion inside of stream.rs for why the queue is drained, // and why it is done in this fashion. pub fn drop_port(&mut self) { - self.port_dropped.store(true, atomic::SeqCst); + self.port_dropped.store(true, Ordering::SeqCst); let mut steals = self.steals; while { - let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst); + let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals } { // See the discussion in 'try_recv' for why we yield @@ -376,8 +377,8 @@ impl<T: Send> Packet<T> { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&mut self) -> SignalToken { - let ptr = self.to_wake.load(atomic::SeqCst); - self.to_wake.store(0, atomic::SeqCst); + let ptr = self.to_wake.load(Ordering::SeqCst); + self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); unsafe { SignalToken::cast_from_uint(ptr) } } @@ -392,15 +393,15 @@ impl<T: Send> Packet<T> { // This is different than the stream version because there's no need to peek // at the queue, we can just look at the local count. pub fn can_recv(&mut self) -> bool { - let cnt = self.cnt.load(atomic::SeqCst); + let cnt = self.cnt.load(Ordering::SeqCst); cnt == DISCONNECTED || cnt - self.steals > 0 } // increment the count on the channel (used for selection) fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomic::SeqCst) { + match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } n => n @@ -444,13 +445,13 @@ impl<T: Send> Packet<T> { // the channel count and figure out what we should do to make it // positive. let steals = { - let cnt = self.cnt.load(atomic::SeqCst); + let cnt = self.cnt.load(Ordering::SeqCst); if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} }; let prev = self.bump(steals + 1); if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); true } else { let cur = prev + steals + 1; @@ -458,7 +459,7 @@ impl<T: Send> Packet<T> { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(atomic::SeqCst) != 0 { + while self.to_wake.load(Ordering::SeqCst) != 0 { Thread::yield_now(); } } @@ -479,8 +480,8 @@ impl<T: Send> Drop for Packet<T> { // disconnection, but also a proper fence before the read of // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. - assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - assert_eq!(self.channels.load(atomic::SeqCst), 0); + assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.channels.load(Ordering::SeqCst), 0); } } diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 15624601157..e8d6e380be5 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -41,7 +41,7 @@ use alloc::boxed::Box; use core::mem; use core::cell::UnsafeCell; -use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use sync::atomic::{AtomicPtr, AtomicUint, Ordering}; // Node within the linked list queue of messages to send struct Node<T> { @@ -109,7 +109,7 @@ impl<T: Send> Queue<T> { pub unsafe fn new(bound: uint) -> Queue<T> { let n1 = Node::new(); let n2 = Node::new(); - (*n1).next.store(n2, Relaxed); + (*n1).next.store(n2, Ordering::Relaxed); Queue { tail: UnsafeCell::new(n2), tail_prev: AtomicPtr::new(n1), @@ -131,8 +131,8 @@ impl<T: Send> Queue<T> { let n = self.alloc(); assert!((*n).value.is_none()); (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (**self.head.get()).next.store(n, Release); + (*n).next.store(0 as *mut Node<T>, Ordering::Relaxed); + (**self.head.get()).next.store(n, Ordering::Release); *self.head.get() = n; } } @@ -144,23 +144,23 @@ impl<T: Send> Queue<T> { // only one subtracting from the cache). if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); + let b = self.cache_subtractions.load(Ordering::Relaxed); + self.cache_subtractions.store(b + 1, Ordering::Relaxed); } let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); + *self.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - *self.tail_copy.get() = self.tail_prev.load(Acquire); + *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); if *self.first.get() != *self.tail_copy.get() { if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); + let b = self.cache_subtractions.load(Ordering::Relaxed); + self.cache_subtractions.store(b + 1, Ordering::Relaxed); } let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); + *self.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -177,25 +177,26 @@ impl<T: Send> Queue<T> { // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); + let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); *self.tail.get() = next; if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); + self.tail_prev.store(tail, Ordering::Release); } else { // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); + let additions = self.cache_additions.load(Ordering::Relaxed); + let subtractions = self.cache_subtractions.load(Ordering::Relaxed); let size = additions - subtractions; if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); + self.tail_prev.store(tail, Ordering::Release); + self.cache_additions.store(additions + 1, Ordering::Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + (*self.tail_prev.load(Ordering::Relaxed)) + .next.store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box<Node<T>> = mem::transmute(tail); @@ -217,7 +218,7 @@ impl<T: Send> Queue<T> { // stripped out. unsafe { let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); + let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } return (*next).value.as_mut(); } @@ -230,7 +231,7 @@ impl<T: Send> Drop for Queue<T> { unsafe { let mut cur = *self.first.get(); while !cur.is_null() { - let next = (*cur).next.load(Relaxed); + let next = (*cur).next.load(Ordering::Relaxed); let _n: Box<Node<T>> = mem::transmute(cur); cur = next; } diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs index c526e6acb8f..bd1e74a3390 100644 --- a/src/libstd/sync/mpsc/stream.rs +++ b/src/libstd/sync/mpsc/stream.rs @@ -28,10 +28,10 @@ use core::cmp; use core::int; use thread::Thread; +use sync::atomic::{AtomicInt, AtomicUint, Ordering, AtomicBool}; +use sync::mpsc::Receiver; use sync::mpsc::blocking::{self, SignalToken}; use sync::mpsc::spsc_queue as spsc; -use sync::mpsc::Receiver; -use sync::atomic; const DISCONNECTED: int = int::MIN; #[cfg(test)] @@ -42,11 +42,11 @@ const MAX_STEALS: int = 1 << 20; pub struct Packet<T> { queue: spsc::Queue<Message<T>>, // internal queue for all message - cnt: atomic::AtomicInt, // How many items are on this channel + cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? - to_wake: atomic::AtomicUint, // SignalToken for the blocked thread to wake up + to_wake: AtomicUint, // SignalToken for the blocked thread to wake up - port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed. + port_dropped: AtomicBool, // flag if the channel has been destroyed. } pub enum Failure<T> { @@ -79,11 +79,11 @@ impl<T: Send> Packet<T> { Packet { queue: unsafe { spsc::Queue::new(128) }, - cnt: atomic::AtomicInt::new(0), + cnt: AtomicInt::new(0), steals: 0, - to_wake: atomic::AtomicUint::new(0), + to_wake: AtomicUint::new(0), - port_dropped: atomic::AtomicBool::new(false), + port_dropped: AtomicBool::new(false), } } @@ -91,7 +91,7 @@ impl<T: Send> Packet<T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.port_dropped.load(atomic::SeqCst) { return Err(t) } + if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } match self.do_send(Data(t)) { UpSuccess | UpDisconnected => {}, @@ -103,14 +103,14 @@ impl<T: Send> Packet<T> { pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. - if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected } + if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } self.do_send(GoUp(up)) } fn do_send(&mut self, t: Message<T>) -> UpgradeResult { self.queue.push(t); - match self.cnt.fetch_add(1, atomic::SeqCst) { + match self.cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup -1 => UpWoke(self.take_to_wake()), // As as described before, SPSC queues must be >= -2 @@ -124,7 +124,7 @@ impl<T: Send> Packet<T> { // will never remove this data. We can only have at most one item to // drain (the port drains the rest). DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); let first = self.queue.pop(); let second = self.queue.pop(); assert!(second.is_none()); @@ -143,8 +143,8 @@ impl<T: Send> Packet<T> { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&mut self) -> SignalToken { - let ptr = self.to_wake.load(atomic::SeqCst); - self.to_wake.store(0, atomic::SeqCst); + let ptr = self.to_wake.load(Ordering::SeqCst); + self.to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); unsafe { SignalToken::cast_from_uint(ptr) } } @@ -153,15 +153,15 @@ impl<T: Send> Packet<T> { // back if it shouldn't sleep. Note that this is the location where we take // steals into account. fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_uint() }; - self.to_wake.store(ptr, atomic::SeqCst); + self.to_wake.store(ptr, Ordering::SeqCst); let steals = self.steals; self.steals = 0; - match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } + match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { @@ -170,7 +170,7 @@ impl<T: Send> Packet<T> { } } - self.to_wake.store(0, atomic::SeqCst); + self.to_wake.store(0, Ordering::SeqCst); Err(unsafe { SignalToken::cast_from_uint(ptr) }) } @@ -217,9 +217,9 @@ impl<T: Send> Packet<T> { // adding back in whatever we couldn't factor into steals. Some(data) => { if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomic::SeqCst) { + match self.cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); } n => { let m = cmp::min(n, self.steals); @@ -237,7 +237,7 @@ impl<T: Send> Packet<T> { } None => { - match self.cnt.load(atomic::SeqCst) { + match self.cnt.load(Ordering::SeqCst) { n if n != DISCONNECTED => Err(Empty), // This is a little bit of a tricky case. We failed to pop @@ -266,7 +266,7 @@ impl<T: Send> Packet<T> { pub fn drop_chan(&mut self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { + match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } DISCONNECTED => {} n => { assert!(n >= 0); } @@ -293,7 +293,7 @@ impl<T: Send> Packet<T> { // sends are gated on this flag, so we're immediately guaranteed that // there are a bounded number of active sends that we'll have to deal // with. - self.port_dropped.store(true, atomic::SeqCst); + self.port_dropped.store(true, Ordering::SeqCst); // Now that we're guaranteed to deal with a bounded number of senders, // we need to drain the queue. This draining process happens atomically @@ -306,7 +306,7 @@ impl<T: Send> Packet<T> { let mut steals = self.steals; while { let cnt = self.cnt.compare_and_swap( - steals, DISCONNECTED, atomic::SeqCst); + steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals } { loop { @@ -351,9 +351,9 @@ impl<T: Send> Packet<T> { // increment the count on the channel (used for selection) fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomic::SeqCst) { + match self.cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } n => n @@ -403,7 +403,7 @@ impl<T: Send> Packet<T> { // of time until the data is actually sent. if was_upgrade { assert_eq!(self.steals, 0); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -416,7 +416,7 @@ impl<T: Send> Packet<T> { // If we were previously disconnected, then we know for sure that there // is no task in to_wake, so just keep going let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1; @@ -439,7 +439,7 @@ impl<T: Send> Packet<T> { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(atomic::SeqCst) != 0 { + while self.to_wake.load(Ordering::SeqCst) != 0 { Thread::yield_now(); } } @@ -478,7 +478,7 @@ impl<T: Send> Drop for Packet<T> { // disconnection, but also a proper fence before the read of // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. - assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); } } diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs index 0eee10898bc..6836888e67e 100644 --- a/src/libstd/sync/mpsc/sync.rs +++ b/src/libstd/sync/mpsc/sync.rs @@ -41,14 +41,15 @@ use self::Blocker::*; use vec::Vec; use core::mem; -use sync::{atomic, Mutex, MutexGuard}; +use sync::atomic::{Ordering, AtomicUint}; use sync::mpsc::blocking::{self, WaitToken, SignalToken}; use sync::mpsc::select::StartResult::{self, Installed, Abort}; +use sync::{Mutex, MutexGuard}; pub struct Packet<T> { /// Only field outside of the mutex. Just done for kicks, but mainly because /// the other shared channel already had the code implemented - channels: atomic::AtomicUint, + channels: AtomicUint, lock: Mutex<State<T>>, } @@ -137,7 +138,7 @@ fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) { impl<T: Send> Packet<T> { pub fn new(cap: uint) -> Packet<T> { Packet { - channels: atomic::AtomicUint::new(1), + channels: AtomicUint::new(1), lock: Mutex::new(State { disconnected: false, blocker: NoneBlocked, @@ -304,12 +305,12 @@ impl<T: Send> Packet<T> { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. pub fn clone_chan(&self) { - self.channels.fetch_add(1, atomic::SeqCst); + self.channels.fetch_add(1, Ordering::SeqCst); } pub fn drop_chan(&self) { // Only flag the channel as disconnected if we're the last channel - match self.channels.fetch_sub(1, atomic::SeqCst) { + match self.channels.fetch_sub(1, Ordering::SeqCst) { 1 => {} _ => return } @@ -412,7 +413,7 @@ impl<T: Send> Packet<T> { #[unsafe_destructor] impl<T: Send> Drop for Packet<T> { fn drop(&mut self) { - assert_eq!(self.channels.load(atomic::SeqCst), 0); + assert_eq!(self.channels.load(Ordering::SeqCst), 0); let mut guard = self.lock.lock().unwrap(); assert!(guard.queue.dequeue().is_none()); assert!(guard.canceled.is_none()); diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs index 9e9a17e482f..08e323c9cb4 100644 --- a/src/libstd/sync/once.rs +++ b/src/libstd/sync/once.rs @@ -17,7 +17,7 @@ use int; use kinds::Sync; use mem::drop; use ops::FnOnce; -use sync::atomic; +use sync::atomic::{AtomicInt, Ordering, ATOMIC_INT_INIT}; use sync::{StaticMutex, MUTEX_INIT}; /// A synchronization primitive which can be used to run a one-time global @@ -39,8 +39,8 @@ use sync::{StaticMutex, MUTEX_INIT}; #[stable] pub struct Once { mutex: StaticMutex, - cnt: atomic::AtomicInt, - lock_cnt: atomic::AtomicInt, + cnt: AtomicInt, + lock_cnt: AtomicInt, } unsafe impl Sync for Once {} @@ -49,8 +49,8 @@ unsafe impl Sync for Once {} #[stable] pub const ONCE_INIT: Once = Once { mutex: MUTEX_INIT, - cnt: atomic::ATOMIC_INT_INIT, - lock_cnt: atomic::ATOMIC_INT_INIT, + cnt: ATOMIC_INT_INIT, + lock_cnt: ATOMIC_INT_INIT, }; impl Once { @@ -66,7 +66,7 @@ impl Once { #[stable] pub fn call_once<F>(&'static self, f: F) where F: FnOnce() { // Optimize common path: load is much cheaper than fetch_add. - if self.cnt.load(atomic::SeqCst) < 0 { + if self.cnt.load(Ordering::SeqCst) < 0 { return } @@ -97,11 +97,11 @@ impl Once { // calling `call_once` will return immediately before the initialization // has completed. - let prev = self.cnt.fetch_add(1, atomic::SeqCst); + let prev = self.cnt.fetch_add(1, Ordering::SeqCst); if prev < 0 { // Make sure we never overflow, we'll never have int::MIN // simultaneous calls to `call_once` to make this value go back to 0 - self.cnt.store(int::MIN, atomic::SeqCst); + self.cnt.store(int::MIN, Ordering::SeqCst); return } @@ -109,15 +109,15 @@ impl Once { // otherwise we run the job and record how many people will try to grab // this lock let guard = self.mutex.lock(); - if self.cnt.load(atomic::SeqCst) > 0 { + if self.cnt.load(Ordering::SeqCst) > 0 { f(); - let prev = self.cnt.swap(int::MIN, atomic::SeqCst); - self.lock_cnt.store(prev, atomic::SeqCst); + let prev = self.cnt.swap(int::MIN, Ordering::SeqCst); + self.lock_cnt.store(prev, Ordering::SeqCst); } drop(guard); // Last one out cleans up after everyone else, no leaks! - if self.lock_cnt.fetch_add(-1, atomic::SeqCst) == 1 { + if self.lock_cnt.fetch_add(-1, Ordering::SeqCst) == 1 { unsafe { self.mutex.destroy() } } } |
