about summary refs log tree commit diff
path: root/src/libstd/sync/mpsc/spsc_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync/mpsc/spsc_queue.rs')
-rw-r--r--src/libstd/sync/mpsc/spsc_queue.rs47
1 files changed, 24 insertions, 23 deletions
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs
index 0edb1c24e80..c51aa7619db 100644
--- a/src/libstd/sync/mpsc/spsc_queue.rs
+++ b/src/libstd/sync/mpsc/spsc_queue.rs
@@ -6,8 +6,8 @@
 
 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
 
-use core::ptr;
 use core::cell::UnsafeCell;
+use core::ptr;
 
 use crate::boxed::Box;
 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
@@ -19,16 +19,16 @@ struct Node<T> {
     // FIXME: this could be an uninitialized T if we're careful enough, and
     //      that would reduce memory usage (and be a bit faster).
     //      is it worth it?
-    value: Option<T>,           // nullable for re-use of nodes
-    cached: bool,               // This node goes into the node cache
-    next: AtomicPtr<Node<T>>,   // next node in the queue
+    value: Option<T>,         // nullable for re-use of nodes
+    cached: bool,             // This node goes into the node cache
+    next: AtomicPtr<Node<T>>, // next node in the queue
 }
 
 /// The single-producer single-consumer queue. This structure is not cloneable,
 /// but it can be safely shared in an Arc if it is guaranteed that there
 /// is only one popper and one pusher touching the queue at any one point in
 /// time.
-pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
+pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
     // consumer fields
     consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
 
@@ -38,9 +38,9 @@ pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
 
 struct Consumer<T, Addition> {
     tail: UnsafeCell<*mut Node<T>>, // where to pop from
-    tail_prev: AtomicPtr<Node<T>>, // where to pop from
-    cache_bound: usize, // maximum cache size
-    cached_nodes: AtomicUsize, // number of nodes marked as cachable
+    tail_prev: AtomicPtr<Node<T>>,  // where to pop from
+    cache_bound: usize,             // maximum cache size
+    cached_nodes: AtomicUsize,      // number of nodes marked as cachable
     addition: Addition,
 }
 
@@ -51,9 +51,9 @@ struct Producer<T, Addition> {
     addition: Addition,
 }
 
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
 
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
 
 impl<T> Node<T> {
     fn new() -> *mut Node<T> {
@@ -66,7 +66,6 @@ impl<T> Node<T> {
 }
 
 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
-
     /// Creates a new queue. With given additional elements in the producer and
     /// consumer portions of the queue.
     ///
@@ -107,13 +106,13 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA
                 tail_prev: AtomicPtr::new(n1),
                 cache_bound: bound,
                 cached_nodes: AtomicUsize::new(0),
-                addition: consumer_addition
+                addition: consumer_addition,
             }),
             producer: CacheAligned::new(Producer {
                 head: UnsafeCell::new(n2),
                 first: UnsafeCell::new(n1),
                 tail_copy: UnsafeCell::new(n1),
-                addition: producer_addition
+                addition: producer_addition,
             }),
         }
     }
@@ -142,8 +141,7 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        *self.producer.0.tail_copy.get() =
-            self.consumer.tail_prev.load(Ordering::Acquire);
+        *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
         if *self.producer.first.get() != *self.producer.tail_copy.get() {
             let ret = *self.producer.first.get();
             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
@@ -164,7 +162,9 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA
             // the current tail node is a candidate for going into the cache.
             let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
-            if next.is_null() { return None }
+            if next.is_null() {
+                return None;
+            }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
@@ -182,7 +182,8 @@ impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerA
                     self.consumer.tail_prev.store(tail, Ordering::Release);
                 } else {
                     (*self.consumer.tail_prev.load(Ordering::Relaxed))
-                        .next.store(next, Ordering::Relaxed);
+                        .next
+                        .store(next, Ordering::Relaxed);
                     // We have successfully erased all references to 'tail', so
                     // now we can safely drop it.
                     let _: Box<Node<T>> = Box::from_raw(tail);
@@ -234,9 +235,9 @@ impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition,
 #[cfg(all(test, not(target_os = "emscripten")))]
 mod tests {
     use super::Queue;
+    use crate::sync::mpsc::channel;
     use crate::sync::Arc;
     use crate::thread;
-    use crate::sync::mpsc::channel;
 
     #[test]
     fn smoke() {
@@ -265,15 +266,15 @@ mod tests {
             match queue.peek() {
                 Some(vec) => {
                     assert_eq!(&*vec, &[1]);
-                },
-                None => unreachable!()
+                }
+                None => unreachable!(),
             }
 
             match queue.pop() {
                 Some(vec) => {
                     assert_eq!(&*vec, &[1]);
-                },
-                None => unreachable!()
+                }
+                None => unreachable!(),
             }
         }
     }
@@ -316,7 +317,7 @@ mod tests {
 
             let (tx, rx) = channel();
             let q2 = q.clone();
-            let _t = thread::spawn(move|| {
+            let _t = thread::spawn(move || {
                 for _ in 0..100000 {
                     loop {
                         match q2.pop() {