about summary refs log tree commit diff
path: root/src/libstd/sync/mpsc/spsc_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync/mpsc/spsc_queue.rs')
-rw-r--r--src/libstd/sync/mpsc/spsc_queue.rs161
1 files changed, 106 insertions, 55 deletions
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs
index 1148bc66fba..3ce59270335 100644
--- a/src/libstd/sync/mpsc/spsc_queue.rs
+++ b/src/libstd/sync/mpsc/spsc_queue.rs
@@ -22,12 +22,15 @@ use core::cell::UnsafeCell;
 
 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
 
+use super::cache_aligned::CacheAligned;
+
 // Node within the linked list queue of messages to send
 struct Node<T> {
     // FIXME: this could be an uninitialized T if we're careful enough, and
     //      that would reduce memory usage (and be a bit faster).
     //      is it worth it?
     value: Option<T>,           // nullable for re-use of nodes
+    cached: bool,               // This node goes into the node cache
     next: AtomicPtr<Node<T>>,   // next node in the queue
 }
 
@@ -35,37 +38,45 @@ struct Node<T> {
 /// but it can be safely shared in an Arc if it is guaranteed that there
 /// is only one popper and one pusher touching the queue at any one point in
 /// time.
-pub struct Queue<T> {
+pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
     // consumer fields
+    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
+
+    // producer fields
+    producer: CacheAligned<Producer<T, ProducerAddition>>,
+}
+
+struct Consumer<T, Addition> {
     tail: UnsafeCell<*mut Node<T>>, // where to pop from
     tail_prev: AtomicPtr<Node<T>>, // where to pop from
+    cache_bound: usize, // maximum cache size
+    cached_nodes: AtomicUsize, // number of nodes marked as cachable
+    addition: Addition,
+}
 
-    // producer fields
+struct Producer<T, Addition> {
     head: UnsafeCell<*mut Node<T>>,      // where to push to
     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-
-    // Cache maintenance fields. Additions and subtractions are stored
-    // separately in order to allow them to use nonatomic addition/subtraction.
-    cache_bound: usize,
-    cache_additions: AtomicUsize,
-    cache_subtractions: AtomicUsize,
+    addition: Addition,
 }
 
-unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
 
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
 
 impl<T> Node<T> {
     fn new() -> *mut Node<T> {
         Box::into_raw(box Node {
             value: None,
+            cached: false,
             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
         })
     }
 }
 
 impl<T> Queue<T> {
+    #[cfg(test)]
     /// Creates a new queue.
     ///
     /// This is unsafe as the type system doesn't enforce a single
@@ -84,18 +95,60 @@ impl<T> Queue<T> {
     ///               no bound. Otherwise, the cache will never grow larger than
     ///               `bound` (although the queue itself could be much larger.
     pub unsafe fn new(bound: usize) -> Queue<T> {
+        Self::with_additions(bound, (), ())
+    }
+}
+
+impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
+
+    /// Creates a new queue. With given additional elements in the producer and
+    /// consumer portions of the queue.
+    ///
+    /// Due to the performance implications of cache-contention,
+    /// we wish to keep fields used mainly by the producer on a separate cache
+    /// line than those used by the consumer.
+    /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
+    /// allocate one for small fields, so we allow users to insert additional
+    /// fields into the cache lines already allocated by this for the producer
+    /// and consumer.
+    ///
+    /// This is unsafe as the type system doesn't enforce a single
+    /// consumer-producer relationship. It also allows the consumer to `pop`
+    /// items while there is a `peek` active due to all methods having a
+    /// non-mutable receiver.
+    ///
+    /// # Arguments
+    ///
+    ///   * `bound` - This queue implementation is implemented with a linked
+    ///               list, and this means that a push is always a malloc. In
+    ///               order to amortize this cost, an internal cache of nodes is
+    ///               maintained to prevent a malloc from always being
+    ///               necessary. This bound is the limit on the size of the
+    ///               cache (if desired). If the value is 0, then the cache has
+    ///               no bound. Otherwise, the cache will never grow larger than
+    ///               `bound` (although the queue itself could be much larger.
+    pub unsafe fn with_additions(
+        bound: usize,
+        producer_addition: ProducerAddition,
+        consumer_addition: ConsumerAddition,
+    ) -> Self {
         let n1 = Node::new();
         let n2 = Node::new();
         (*n1).next.store(n2, Ordering::Relaxed);
         Queue {
-            tail: UnsafeCell::new(n2),
-            tail_prev: AtomicPtr::new(n1),
-            head: UnsafeCell::new(n2),
-            first: UnsafeCell::new(n1),
-            tail_copy: UnsafeCell::new(n1),
-            cache_bound: bound,
-            cache_additions: AtomicUsize::new(0),
-            cache_subtractions: AtomicUsize::new(0),
+            consumer: CacheAligned::new(Consumer {
+                tail: UnsafeCell::new(n2),
+                tail_prev: AtomicPtr::new(n1),
+                cache_bound: bound,
+                cached_nodes: AtomicUsize::new(0),
+                addition: consumer_addition
+            }),
+            producer: CacheAligned::new(Producer {
+                head: UnsafeCell::new(n2),
+                first: UnsafeCell::new(n1),
+                tail_copy: UnsafeCell::new(n1),
+                addition: producer_addition
+            }),
         }
     }
 
@@ -109,35 +162,25 @@ impl<T> Queue<T> {
             assert!((*n).value.is_none());
             (*n).value = Some(t);
             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
-            (**self.head.get()).next.store(n, Ordering::Release);
-            *self.head.get() = n;
+            (**self.producer.head.get()).next.store(n, Ordering::Release);
+            *(&self.producer.head).get() = n;
         }
     }
 
     unsafe fn alloc(&self) -> *mut Node<T> {
         // First try to see if we can consume the 'first' node for our uses.
-        // We try to avoid as many atomic instructions as possible here, so
-        // the addition to cache_subtractions is not atomic (plus we're the
-        // only one subtracting from the cache).
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        *self.producer.0.tail_copy.get() =
+            self.consumer.tail_prev.load(Ordering::Acquire);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If all of that fails, then we have to allocate a new node
@@ -153,27 +196,27 @@ impl<T> Queue<T> {
             // sentinel from where we should start popping from. Hence, look at
             // tail's next field and see if we can use it. If we do a pop, then
             // the current tail node is a candidate for going into the cache.
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { return None }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
-            *self.tail.get() = next;
-            if self.cache_bound == 0 {
-                self.tail_prev.store(tail, Ordering::Release);
+            *self.consumer.0.tail.get() = next;
+            if self.consumer.cache_bound == 0 {
+                self.consumer.tail_prev.store(tail, Ordering::Release);
             } else {
-                // FIXME: this is dubious with overflow.
-                let additions = self.cache_additions.load(Ordering::Relaxed);
-                let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
-                let size = additions - subtractions;
-
-                if size < self.cache_bound {
-                    self.tail_prev.store(tail, Ordering::Release);
-                    self.cache_additions.store(additions + 1, Ordering::Relaxed);
+                let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
+                if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
+                    self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
+                    (*tail).cached = true;
+                }
+
+                if (*tail).cached {
+                    self.consumer.tail_prev.store(tail, Ordering::Release);
                 } else {
-                    (*self.tail_prev.load(Ordering::Relaxed))
-                          .next.store(next, Ordering::Relaxed);
+                    (*self.consumer.tail_prev.load(Ordering::Relaxed))
+                        .next.store(next, Ordering::Relaxed);
                     // We have successfully erased all references to 'tail', so
                     // now we can safely drop it.
                     let _: Box<Node<T>> = Box::from_raw(tail);
@@ -194,17 +237,25 @@ impl<T> Queue<T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
         unsafe {
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { None } else { (*next).value.as_mut() }
         }
     }
+
+    pub fn producer_addition(&self) -> &ProducerAddition {
+        &self.producer.addition
+    }
+
+    pub fn consumer_addition(&self) -> &ConsumerAddition {
+        &self.consumer.addition
+    }
 }
 
-impl<T> Drop for Queue<T> {
+impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
     fn drop(&mut self) {
         unsafe {
-            let mut cur = *self.first.get();
+            let mut cur = *self.producer.first.get();
             while !cur.is_null() {
                 let next = (*cur).next.load(Ordering::Relaxed);
                 let _n: Box<Node<T>> = Box::from_raw(cur);