diff options
Diffstat (limited to 'src/libstd/sync/mpsc/spsc_queue.rs')
| -rw-r--r-- | src/libstd/sync/mpsc/spsc_queue.rs | 161 |
1 files changed, 106 insertions, 55 deletions
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 1148bc66fba..3ce59270335 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -22,12 +22,15 @@ use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use super::cache_aligned::CacheAligned; + // Node within the linked list queue of messages to send struct Node<T> { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? value: Option<T>, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache next: AtomicPtr<Node<T>>, // next node in the queue } @@ -35,37 +38,45 @@ struct Node<T> { /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue<T> { +pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { // consumer fields + consumer: CacheAligned<Consumer<T, ConsumerAddition>>, + + // producer fields + producer: CacheAligned<Producer<T, ProducerAddition>>, +} + +struct Consumer<T, Addition> { tail: UnsafeCell<*mut Node<T>>, // where to pop from tail_prev: AtomicPtr<Node<T>>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable + addition: Addition, +} - // producer fields +struct Producer<T, Addition> { head: UnsafeCell<*mut Node<T>>, // where to push to first: UnsafeCell<*mut Node<T>>, // where to get new nodes from tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: usize, - cache_additions: AtomicUsize, - cache_subtractions: AtomicUsize, + addition: Addition, } -unsafe impl<T: Send> Send for Queue<T> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { } impl<T> Node<T> { fn new() -> *mut Node<T> { Box::into_raw(box Node { value: None, + cached: false, next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), }) } } impl<T> Queue<T> { + #[cfg(test)] /// Creates a new queue. /// /// This is unsafe as the type system doesn't enforce a single @@ -84,18 +95,60 @@ impl<T> Queue<T> { /// no bound. Otherwise, the cache will never grow larger than /// `bound` (although the queue itself could be much larger. pub unsafe fn new(bound: usize) -> Queue<T> { + Self::with_additions(bound, (), ()) + } +} + +impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { + + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { let n1 = Node::new(); let n2 = Node::new(); (*n1).next.store(n2, Ordering::Relaxed); Queue { - tail: UnsafeCell::new(n2), - tail_prev: AtomicPtr::new(n1), - head: UnsafeCell::new(n2), - first: UnsafeCell::new(n1), - tail_copy: UnsafeCell::new(n1), - cache_bound: bound, - cache_additions: AtomicUsize::new(0), - cache_subtractions: AtomicUsize::new(0), + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition + }), } } @@ -109,35 +162,25 @@ impl<T> Queue<T> { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(ptr::null_mut(), Ordering::Relaxed); - (**self.head.get()).next.store(n, Ordering::Release); - *self.head.get() = n; + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; } } unsafe fn alloc(&self) -> *mut Node<T> { // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + *self.producer.0.tail_copy.get() = + self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -153,27 +196,27 @@ impl<T> Queue<T> { // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Ordering::Release); + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Ordering::Relaxed); - let subtractions = self.cache_subtractions.load(Ordering::Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Ordering::Release); - self.cache_additions.store(additions + 1, Ordering::Relaxed); + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - (*self.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next.store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box<Node<T>> = Box::from_raw(tail); @@ -194,17 +237,25 @@ impl<T> Queue<T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { None } else { (*next).value.as_mut() } } } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } } -impl<T> Drop for Queue<T> { +impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> { fn drop(&mut self) { unsafe { - let mut cur = *self.first.get(); + let mut cur = *self.producer.first.get(); while !cur.is_null() { let next = (*cur).next.load(Ordering::Relaxed); let _n: Box<Node<T>> = Box::from_raw(cur); |
