diff options
Diffstat (limited to 'src/libstd/sync/mpsc/spsc_queue.rs')
| -rw-r--r-- | src/libstd/sync/mpsc/spsc_queue.rs | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 0edb1c24e80..c51aa7619db 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -6,8 +6,8 @@ // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue -use core::ptr; use core::cell::UnsafeCell; +use core::ptr; use crate::boxed::Box; use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; @@ -19,16 +19,16 @@ struct Node<T> { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? - value: Option<T>, // nullable for re-use of nodes - cached: bool, // This node goes into the node cache - next: AtomicPtr<Node<T>>, // next node in the queue + value: Option<T>, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache + next: AtomicPtr<Node<T>>, // next node in the queue } /// The single-producer single-consumer queue. This structure is not cloneable, /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { +pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> { // consumer fields consumer: CacheAligned<Consumer<T, ConsumerAddition>>, @@ -38,9 +38,9 @@ pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { struct Consumer<T, Addition> { tail: UnsafeCell<*mut Node<T>>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - cache_bound: usize, // maximum cache size - cached_nodes: AtomicUsize, // number of nodes marked as cachable + tail_prev: AtomicPtr<Node<T>>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable addition: Addition, } @@ -51,9 +51,9 @@ struct Producer<T, Addition> { addition: Addition, } -unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {} -unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {} impl<T> Node<T> { fn new() -> *mut Node<T> { @@ -66,7 +66,6 @@ impl<T> Node<T> { } impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { - /// Creates a new queue. With given additional elements in the producer and /// consumer portions of the queue. /// @@ -107,13 +106,13 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA tail_prev: AtomicPtr::new(n1), cache_bound: bound, cached_nodes: AtomicUsize::new(0), - addition: consumer_addition + addition: consumer_addition, }), producer: CacheAligned::new(Producer { head: UnsafeCell::new(n2), first: UnsafeCell::new(n1), tail_copy: UnsafeCell::new(n1), - addition: producer_addition + addition: producer_addition, }), } } @@ -142,8 +141,7 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA } // If the above fails, then update our copy of the tail and try // again. - *self.producer.0.tail_copy.get() = - self.consumer.tail_prev.load(Ordering::Acquire); + *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); if *self.producer.first.get() != *self.producer.tail_copy.get() { let ret = *self.producer.first.get(); *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); @@ -164,7 +162,9 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA // the current tail node is a candidate for going into the cache. let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); - if next.is_null() { return None } + if next.is_null() { + return None; + } assert!((*next).value.is_some()); let ret = (*next).value.take(); @@ -182,7 +182,8 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA self.consumer.tail_prev.store(tail, Ordering::Release); } else { (*self.consumer.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + .next + .store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box<Node<T>> = Box::from_raw(tail); @@ -234,9 +235,9 @@ impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, #[cfg(all(test, not(target_os = "emscripten")))] mod tests { use super::Queue; + use crate::sync::mpsc::channel; use crate::sync::Arc; use crate::thread; - use crate::sync::mpsc::channel; #[test] fn smoke() { @@ -265,15 +266,15 @@ mod tests { match queue.peek() { Some(vec) => { assert_eq!(&*vec, &[1]); - }, - None => unreachable!() + } + None => unreachable!(), } match queue.pop() { Some(vec) => { assert_eq!(&*vec, &[1]); - }, - None => unreachable!() + } + None => unreachable!(), } } } @@ -316,7 +317,7 @@ mod tests { let (tx, rx) = channel(); let q2 = q.clone(); - let _t = thread::spawn(move|| { + let _t = thread::spawn(move || { for _ in 0..100000 { loop { match q2.pop() { |
