about summary refs log tree commit diff
diff options
context:
space:
mode:
authorJoshua Lockerman <j@Js-MacBook-Air.home>2017-09-29 15:58:11 -0400
committerJoshua Lockerman <j@Js-MacBook-Air.home>2017-10-01 12:15:35 -0400
commit68341a91eecb830c57779cd2423733a7395258ab (patch)
tree851726d9a606ab4ffd49b719f741dd856e09dfd1
parent0e6f4cf51cd3b799fb057956f8e733d16605d09b (diff)
downloadrust-68341a91eecb830c57779cd2423733a7395258ab.tar.gz
rust-68341a91eecb830c57779cd2423733a7395258ab.zip
Improve performance of spsc_queue and stream.
This commit makes two main changes.
1. It switches the spsc_queue node caching strategy from keeping a shared
counter of the number of nodes in the cache to keeping a consumer only counter
of the number of node eligible to be cached.
2. It separate the consumer and producers fields of spsc_queue and stream into
a producer cache line and consumer cache line.
-rw-r--r--src/libstd/lib.rs2
-rw-r--r--src/libstd/sync/mpsc/cache_aligned.rs37
-rw-r--r--src/libstd/sync/mpsc/mod.rs2
-rw-r--r--src/libstd/sync/mpsc/spsc_queue.rs161
-rw-r--r--src/libstd/sync/mpsc/stream.rs105
5 files changed, 208 insertions, 99 deletions
diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs
index 9fc7e2c01aa..83cc9ce582e 100644
--- a/src/libstd/lib.rs
+++ b/src/libstd/lib.rs
@@ -244,6 +244,7 @@
 #![feature(allow_internal_unstable)]
 #![feature(align_offset)]
 #![feature(asm)]
+#![feature(attr_literals)]
 #![feature(box_syntax)]
 #![feature(cfg_target_has_atomic)]
 #![feature(cfg_target_thread_local)]
@@ -290,6 +291,7 @@
 #![feature(prelude_import)]
 #![feature(rand)]
 #![feature(raw)]
+#![feature(repr_align)]
 #![feature(repr_simd)]
 #![feature(rustc_attrs)]
 #![cfg_attr(not(stage0), feature(rustc_const_unstable))]
diff --git a/src/libstd/sync/mpsc/cache_aligned.rs b/src/libstd/sync/mpsc/cache_aligned.rs
new file mode 100644
index 00000000000..5af01262573
--- /dev/null
+++ b/src/libstd/sync/mpsc/cache_aligned.rs
@@ -0,0 +1,37 @@
+// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use ops::{Deref, DerefMut};
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+#[repr(align(64))]
+pub(super) struct Aligner;
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub(super) struct CacheAligned<T>(pub T, pub Aligner);
+
+impl<T> Deref for CacheAligned<T> {
+     type Target = T;
+     fn deref(&self) -> &Self::Target {
+         &self.0
+     }
+}
+
+impl<T> DerefMut for CacheAligned<T> {
+     fn deref_mut(&mut self) -> &mut Self::Target {
+         &mut self.0
+     }
+}
+
+impl<T> CacheAligned<T> {
+    pub(super) fn new(t: T) -> Self {
+        CacheAligned(t, Aligner)
+    }
+}
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
index dcd4c8dfdf5..0bfbcd2d2cd 100644
--- a/src/libstd/sync/mpsc/mod.rs
+++ b/src/libstd/sync/mpsc/mod.rs
@@ -297,6 +297,8 @@ mod sync;
 mod mpsc_queue;
 mod spsc_queue;
 
+mod cache_aligned;
+
 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
 /// This half can only be owned by one thread.
 ///
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs
index 1148bc66fba..3ce59270335 100644
--- a/src/libstd/sync/mpsc/spsc_queue.rs
+++ b/src/libstd/sync/mpsc/spsc_queue.rs
@@ -22,12 +22,15 @@ use core::cell::UnsafeCell;
 
 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
 
+use super::cache_aligned::CacheAligned;
+
 // Node within the linked list queue of messages to send
 struct Node<T> {
     // FIXME: this could be an uninitialized T if we're careful enough, and
     //      that would reduce memory usage (and be a bit faster).
     //      is it worth it?
     value: Option<T>,           // nullable for re-use of nodes
+    cached: bool,               // This node goes into the node cache
     next: AtomicPtr<Node<T>>,   // next node in the queue
 }
 
@@ -35,37 +38,45 @@ struct Node<T> {
 /// but it can be safely shared in an Arc if it is guaranteed that there
 /// is only one popper and one pusher touching the queue at any one point in
 /// time.
-pub struct Queue<T> {
+pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
     // consumer fields
+    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
+
+    // producer fields
+    producer: CacheAligned<Producer<T, ProducerAddition>>,
+}
+
+struct Consumer<T, Addition> {
     tail: UnsafeCell<*mut Node<T>>, // where to pop from
     tail_prev: AtomicPtr<Node<T>>, // where to pop from
+    cache_bound: usize, // maximum cache size
+    cached_nodes: AtomicUsize, // number of nodes marked as cachable
+    addition: Addition,
+}
 
-    // producer fields
+struct Producer<T, Addition> {
     head: UnsafeCell<*mut Node<T>>,      // where to push to
     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-
-    // Cache maintenance fields. Additions and subtractions are stored
-    // separately in order to allow them to use nonatomic addition/subtraction.
-    cache_bound: usize,
-    cache_additions: AtomicUsize,
-    cache_subtractions: AtomicUsize,
+    addition: Addition,
 }
 
-unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
 
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
 
 impl<T> Node<T> {
     fn new() -> *mut Node<T> {
         Box::into_raw(box Node {
             value: None,
+            cached: false,
             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
         })
     }
 }
 
 impl<T> Queue<T> {
+    #[cfg(test)]
     /// Creates a new queue.
     ///
     /// This is unsafe as the type system doesn't enforce a single
@@ -84,18 +95,60 @@ impl<T> Queue<T> {
     ///               no bound. Otherwise, the cache will never grow larger than
     ///               `bound` (although the queue itself could be much larger.
     pub unsafe fn new(bound: usize) -> Queue<T> {
+        Self::with_additions(bound, (), ())
+    }
+}
+
+impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
+
+    /// Creates a new queue. With given additional elements in the producer and
+    /// consumer portions of the queue.
+    ///
+    /// Due to the performance implications of cache-contention,
+    /// we wish to keep fields used mainly by the producer on a separate cache
+    /// line than those used by the consumer.
+    /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
+    /// allocate one for small fields, so we allow users to insert additional
+    /// fields into the cache lines already allocated by this for the producer
+    /// and consumer.
+    ///
+    /// This is unsafe as the type system doesn't enforce a single
+    /// consumer-producer relationship. It also allows the consumer to `pop`
+    /// items while there is a `peek` active due to all methods having a
+    /// non-mutable receiver.
+    ///
+    /// # Arguments
+    ///
+    ///   * `bound` - This queue implementation is implemented with a linked
+    ///               list, and this means that a push is always a malloc. In
+    ///               order to amortize this cost, an internal cache of nodes is
+    ///               maintained to prevent a malloc from always being
+    ///               necessary. This bound is the limit on the size of the
+    ///               cache (if desired). If the value is 0, then the cache has
+    ///               no bound. Otherwise, the cache will never grow larger than
+    ///               `bound` (although the queue itself could be much larger.
+    pub unsafe fn with_additions(
+        bound: usize,
+        producer_addition: ProducerAddition,
+        consumer_addition: ConsumerAddition,
+    ) -> Self {
         let n1 = Node::new();
         let n2 = Node::new();
         (*n1).next.store(n2, Ordering::Relaxed);
         Queue {
-            tail: UnsafeCell::new(n2),
-            tail_prev: AtomicPtr::new(n1),
-            head: UnsafeCell::new(n2),
-            first: UnsafeCell::new(n1),
-            tail_copy: UnsafeCell::new(n1),
-            cache_bound: bound,
-            cache_additions: AtomicUsize::new(0),
-            cache_subtractions: AtomicUsize::new(0),
+            consumer: CacheAligned::new(Consumer {
+                tail: UnsafeCell::new(n2),
+                tail_prev: AtomicPtr::new(n1),
+                cache_bound: bound,
+                cached_nodes: AtomicUsize::new(0),
+                addition: consumer_addition
+            }),
+            producer: CacheAligned::new(Producer {
+                head: UnsafeCell::new(n2),
+                first: UnsafeCell::new(n1),
+                tail_copy: UnsafeCell::new(n1),
+                addition: producer_addition
+            }),
         }
     }
 
@@ -109,35 +162,25 @@ impl<T> Queue<T> {
             assert!((*n).value.is_none());
             (*n).value = Some(t);
             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
-            (**self.head.get()).next.store(n, Ordering::Release);
-            *self.head.get() = n;
+            (**self.producer.head.get()).next.store(n, Ordering::Release);
+            *(&self.producer.head).get() = n;
         }
     }
 
     unsafe fn alloc(&self) -> *mut Node<T> {
         // First try to see if we can consume the 'first' node for our uses.
-        // We try to avoid as many atomic instructions as possible here, so
-        // the addition to cache_subtractions is not atomic (plus we're the
-        // only one subtracting from the cache).
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        *self.producer.0.tail_copy.get() =
+            self.consumer.tail_prev.load(Ordering::Acquire);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If all of that fails, then we have to allocate a new node
@@ -153,27 +196,27 @@ impl<T> Queue<T> {
             // sentinel from where we should start popping from. Hence, look at
             // tail's next field and see if we can use it. If we do a pop, then
             // the current tail node is a candidate for going into the cache.
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { return None }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
-            *self.tail.get() = next;
-            if self.cache_bound == 0 {
-                self.tail_prev.store(tail, Ordering::Release);
+            *self.consumer.0.tail.get() = next;
+            if self.consumer.cache_bound == 0 {
+                self.consumer.tail_prev.store(tail, Ordering::Release);
             } else {
-                // FIXME: this is dubious with overflow.
-                let additions = self.cache_additions.load(Ordering::Relaxed);
-                let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
-                let size = additions - subtractions;
-
-                if size < self.cache_bound {
-                    self.tail_prev.store(tail, Ordering::Release);
-                    self.cache_additions.store(additions + 1, Ordering::Relaxed);
+                let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
+                if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
+                    self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
+                    (*tail).cached = true;
+                }
+
+                if (*tail).cached {
+                    self.consumer.tail_prev.store(tail, Ordering::Release);
                 } else {
-                    (*self.tail_prev.load(Ordering::Relaxed))
-                          .next.store(next, Ordering::Relaxed);
+                    (*self.consumer.tail_prev.load(Ordering::Relaxed))
+                        .next.store(next, Ordering::Relaxed);
                     // We have successfully erased all references to 'tail', so
                     // now we can safely drop it.
                     let _: Box<Node<T>> = Box::from_raw(tail);
@@ -194,17 +237,25 @@ impl<T> Queue<T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
         unsafe {
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { None } else { (*next).value.as_mut() }
         }
     }
+
+    pub fn producer_addition(&self) -> &ProducerAddition {
+        &self.producer.addition
+    }
+
+    pub fn consumer_addition(&self) -> &ConsumerAddition {
+        &self.consumer.addition
+    }
 }
 
-impl<T> Drop for Queue<T> {
+impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
     fn drop(&mut self) {
         unsafe {
-            let mut cur = *self.first.get();
+            let mut cur = *self.producer.first.get();
             while !cur.is_null() {
                 let next = (*cur).next.load(Ordering::Relaxed);
                 let _n: Box<Node<T>> = Box::from_raw(cur);
diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs
index 47cd8977fda..d1515eba68c 100644
--- a/src/libstd/sync/mpsc/stream.rs
+++ b/src/libstd/sync/mpsc/stream.rs
@@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5;
 const MAX_STEALS: isize = 1 << 20;
 
 pub struct Packet<T> {
-    queue: spsc::Queue<Message<T>>, // internal queue for all message
+    // internal queue for all messages
+    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
+}
 
+struct ProducerAddition {
     cnt: AtomicIsize, // How many items are on this channel
-    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
     to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
 
     port_dropped: AtomicBool, // flag if the channel has been destroyed.
 }
 
+struct ConsumerAddition {
+    steals: UnsafeCell<isize>,  // How many times has a port received without blocking?
+}
+
+
 pub enum Failure<T> {
     Empty,
     Disconnected,
@@ -78,13 +85,18 @@ enum Message<T> {
 impl<T> Packet<T> {
     pub fn new() -> Packet<T> {
         Packet {
-            queue: unsafe { spsc::Queue::new(128) },
-
-            cnt: AtomicIsize::new(0),
-            steals: UnsafeCell::new(0),
-            to_wake: AtomicUsize::new(0),
-
-            port_dropped: AtomicBool::new(false),
+            queue: unsafe { spsc::Queue::with_additions(
+                128,
+                ProducerAddition {
+                    cnt: AtomicIsize::new(0),
+                    to_wake: AtomicUsize::new(0),
+
+                    port_dropped: AtomicBool::new(false),
+                },
+                ConsumerAddition {
+                    steals: UnsafeCell::new(0),
+                }
+            )},
         }
     }
 
@@ -92,7 +104,7 @@ impl<T> Packet<T> {
         // If the other port has deterministically gone away, then definitely
         // must return the data back up the stack. Otherwise, the data is
         // considered as being sent.
-        if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
 
         match self.do_send(Data(t)) {
             UpSuccess | UpDisconnected => {},
@@ -104,14 +116,16 @@ impl<T> Packet<T> {
     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
         // If the port has gone away, then there's no need to proceed any
         // further.
-        if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
+            return UpDisconnected
+        }
 
         self.do_send(GoUp(up))
     }
 
     fn do_send(&self, t: Message<T>) -> UpgradeResult {
         self.queue.push(t);
-        match self.cnt.fetch_add(1, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
             // As described in the mod's doc comment, -1 == wakeup
             -1 => UpWoke(self.take_to_wake()),
             // As as described before, SPSC queues must be >= -2
@@ -125,7 +139,7 @@ impl<T> Packet<T> {
             // will never remove this data. We can only have at most one item to
             // drain (the port drains the rest).
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 let first = self.queue.pop();
                 let second = self.queue.pop();
                 assert!(second.is_none());
@@ -144,8 +158,8 @@ impl<T> Packet<T> {
 
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
-        let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(0, Ordering::SeqCst);
+        let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         assert!(ptr != 0);
         unsafe { SignalToken::cast_from_usize(ptr) }
     }
@@ -154,14 +168,16 @@ impl<T> Packet<T> {
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
     fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
         let ptr = unsafe { token.cast_to_usize() };
-        self.to_wake.store(ptr, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
 
-        let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+        let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
 
-        match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+        match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+            DISCONNECTED => {
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
+            }
             // If we factor in our steals and notice that the channel has no
             // data, we successfully sleep
             n => {
@@ -170,7 +186,7 @@ impl<T> Packet<T> {
             }
         }
 
-        self.to_wake.store(0, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         Err(unsafe { SignalToken::cast_from_usize(ptr) })
     }
 
@@ -201,7 +217,7 @@ impl<T> Packet<T> {
             // "steal" factored into the channel count above).
             data @ Ok(..) |
             data @ Err(Upgraded(..)) => unsafe {
-                *self.steals.get() -= 1;
+                *self.queue.consumer_addition().steals.get() -= 1;
                 data
             },
 
@@ -223,20 +239,21 @@ impl<T> Packet<T> {
             // down as much as possible (without going negative), and then
             // adding back in whatever we couldn't factor into steals.
             Some(data) => unsafe {
-                if *self.steals.get() > MAX_STEALS {
-                    match self.cnt.swap(0, Ordering::SeqCst) {
+                if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
+                    match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
                         DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                            self.queue.producer_addition().cnt.store(
+                                DISCONNECTED, Ordering::SeqCst);
                         }
                         n => {
-                            let m = cmp::min(n, *self.steals.get());
-                            *self.steals.get() -= m;
+                            let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
+                            *self.queue.consumer_addition().steals.get() -= m;
                             self.bump(n - m);
                         }
                     }
-                    assert!(*self.steals.get() >= 0);
+                    assert!(*self.queue.consumer_addition().steals.get() >= 0);
                 }
-                *self.steals.get() += 1;
+                *self.queue.consumer_addition().steals.get() += 1;
                 match data {
                     Data(t) => Ok(t),
                     GoUp(up) => Err(Upgraded(up)),
@@ -244,7 +261,7 @@ impl<T> Packet<T> {
             },
 
             None => {
-                match self.cnt.load(Ordering::SeqCst) {
+                match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
                     n if n != DISCONNECTED => Err(Empty),
 
                     // This is a little bit of a tricky case. We failed to pop
@@ -273,7 +290,7 @@ impl<T> Packet<T> {
     pub fn drop_chan(&self) {
         // Dropping a channel is pretty simple, we just flag it as disconnected
         // and then wakeup a blocker if there is one.
-        match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
             -1 => { self.take_to_wake().signal(); }
             DISCONNECTED => {}
             n => { assert!(n >= 0); }
@@ -300,7 +317,7 @@ impl<T> Packet<T> {
         // sends are gated on this flag, so we're immediately guaranteed that
         // there are a bounded number of active sends that we'll have to deal
         // with.
-        self.port_dropped.store(true, Ordering::SeqCst);
+        self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
 
         // Now that we're guaranteed to deal with a bounded number of senders,
         // we need to drain the queue. This draining process happens atomically
@@ -310,9 +327,9 @@ impl<T> Packet<T> {
         // continue to fail while active senders send data while we're dropping
         // data, but eventually we're guaranteed to break out of this loop
         // (because there is a bounded number of senders).
-        let mut steals = unsafe { *self.steals.get() };
+        let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
         while {
-            let cnt = self.cnt.compare_and_swap(
+            let cnt = self.queue.producer_addition().cnt.compare_and_swap(
                             steals, DISCONNECTED, Ordering::SeqCst);
             cnt != DISCONNECTED && cnt != steals
         } {
@@ -353,9 +370,9 @@ impl<T> Packet<T> {
 
     // increment the count on the channel (used for selection)
     fn bump(&self, amt: isize) -> isize {
-        match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 DISCONNECTED
             }
             n => n
@@ -404,8 +421,8 @@ impl<T> Packet<T> {
         // this end. This is fine because we know it's a small bounded windows
         // of time until the data is actually sent.
         if was_upgrade {
-            assert_eq!(unsafe { *self.steals.get() }, 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             return Ok(true)
         }
 
@@ -418,7 +435,7 @@ impl<T> Packet<T> {
         // If we were previously disconnected, then we know for sure that there
         // is no thread in to_wake, so just keep going
         let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             true // there is data, that data is that we're disconnected
         } else {
             let cur = prev + steals + 1;
@@ -441,13 +458,13 @@ impl<T> Packet<T> {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.to_wake.load(Ordering::SeqCst) != 0 {
+                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
                     thread::yield_now();
                 }
             }
             unsafe {
-                assert_eq!(*self.steals.get(), 0);
-                *self.steals.get() = steals;
+                assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
+                *self.queue.consumer_addition().steals.get() = steals;
             }
 
             // if we were previously positive, then there's surely data to
@@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> {
         // disconnection, but also a proper fence before the read of
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
-        assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
     }
 }