about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
authorDustin Speckhals <dustin1114@gmail.com>2017-10-24 19:37:15 -0400
committerDustin Speckhals <dustin1114@gmail.com>2017-10-24 19:37:15 -0400
commitbca47e42b2c878eafb109c59cf0c8074043c1251 (patch)
tree968a99f37f9099c4ed0b68b8020400322e94b667 /src/libstd/sync
parent69ba6738eb6bf9b6d11ebb81af2599648eb0bc04 (diff)
parentc2799fc9a5c631a790744ceb9cd08259af338c0c (diff)
downloadrust-bca47e42b2c878eafb109c59cf0c8074043c1251.tar.gz
rust-bca47e42b2c878eafb109c59cf0c8074043c1251.zip
Merge branch 'master' into update-rls-data-for-save-analysis
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/mpsc/cache_aligned.rs37
-rw-r--r--src/libstd/sync/mpsc/mod.rs26
-rw-r--r--src/libstd/sync/mpsc/select.rs18
-rw-r--r--src/libstd/sync/mpsc/spsc_queue.rs153
-rw-r--r--src/libstd/sync/mpsc/stream.rs105
-rw-r--r--src/libstd/sync/mutex.rs13
-rw-r--r--src/libstd/sync/once.rs94
-rw-r--r--src/libstd/sync/rwlock.rs13
8 files changed, 296 insertions, 163 deletions
diff --git a/src/libstd/sync/mpsc/cache_aligned.rs b/src/libstd/sync/mpsc/cache_aligned.rs
new file mode 100644
index 00000000000..5af01262573
--- /dev/null
+++ b/src/libstd/sync/mpsc/cache_aligned.rs
@@ -0,0 +1,37 @@
+// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use ops::{Deref, DerefMut};
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+#[repr(align(64))]
+pub(super) struct Aligner;
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub(super) struct CacheAligned<T>(pub T, pub Aligner);
+
+impl<T> Deref for CacheAligned<T> {
+     type Target = T;
+     fn deref(&self) -> &Self::Target {
+         &self.0
+     }
+}
+
+impl<T> DerefMut for CacheAligned<T> {
+     fn deref_mut(&mut self) -> &mut Self::Target {
+         &mut self.0
+     }
+}
+
+impl<T> CacheAligned<T> {
+    pub(super) fn new(t: T) -> Self {
+        CacheAligned(t, Aligner)
+    }
+}
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs
index dcd4c8dfdf5..45a26e594b0 100644
--- a/src/libstd/sync/mpsc/mod.rs
+++ b/src/libstd/sync/mpsc/mod.rs
@@ -297,6 +297,8 @@ mod sync;
 mod mpsc_queue;
 mod spsc_queue;
 
+mod cache_aligned;
+
 /// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
 /// This half can only be owned by one thread.
 ///
@@ -919,7 +921,7 @@ impl<T> Drop for Sender<T> {
 #[stable(feature = "mpsc_debug", since = "1.8.0")]
 impl<T> fmt::Debug for Sender<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "Sender {{ .. }}")
+        f.debug_struct("Sender").finish()
     }
 }
 
@@ -1049,7 +1051,7 @@ impl<T> Drop for SyncSender<T> {
 #[stable(feature = "mpsc_debug", since = "1.8.0")]
 impl<T> fmt::Debug for SyncSender<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "SyncSender {{ .. }}")
+        f.debug_struct("SyncSender").finish()
     }
 }
 
@@ -1551,7 +1553,7 @@ impl<T> Drop for Receiver<T> {
 #[stable(feature = "mpsc_debug", since = "1.8.0")]
 impl<T> fmt::Debug for Receiver<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "Receiver {{ .. }}")
+        f.debug_struct("Receiver").finish()
     }
 }
 
@@ -3009,22 +3011,4 @@ mod sync_tests {
             repro()
         }
     }
-
-    #[test]
-    fn fmt_debug_sender() {
-        let (tx, _) = channel::<i32>();
-        assert_eq!(format!("{:?}", tx), "Sender { .. }");
-    }
-
-    #[test]
-    fn fmt_debug_recv() {
-        let (_, rx) = channel::<i32>();
-        assert_eq!(format!("{:?}", rx), "Receiver { .. }");
-    }
-
-    #[test]
-    fn fmt_debug_sync_sender() {
-        let (tx, _) = sync_channel::<i32>(1);
-        assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
-    }
 }
diff --git a/src/libstd/sync/mpsc/select.rs b/src/libstd/sync/mpsc/select.rs
index e49f4cff024..a9f3cea243f 100644
--- a/src/libstd/sync/mpsc/select.rs
+++ b/src/libstd/sync/mpsc/select.rs
@@ -354,13 +354,13 @@ impl Iterator for Packets {
 
 impl fmt::Debug for Select {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "Select {{ .. }}")
+        f.debug_struct("Select").finish()
     }
 }
 
 impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "Handle {{ .. }}")
+        f.debug_struct("Handle").finish()
     }
 }
 
@@ -774,18 +774,4 @@ mod tests {
             }
         }
     }
-
-    #[test]
-    fn fmt_debug_select() {
-        let sel = Select::new();
-        assert_eq!(format!("{:?}", sel), "Select { .. }");
-    }
-
-    #[test]
-    fn fmt_debug_handle() {
-        let (_, rx) = channel::<i32>();
-        let sel = Select::new();
-        let handle = sel.handle(&rx);
-        assert_eq!(format!("{:?}", handle), "Handle { .. }");
-    }
 }
diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs
index 1148bc66fba..cc4be92276a 100644
--- a/src/libstd/sync/mpsc/spsc_queue.rs
+++ b/src/libstd/sync/mpsc/spsc_queue.rs
@@ -22,12 +22,15 @@ use core::cell::UnsafeCell;
 
 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
 
+use super::cache_aligned::CacheAligned;
+
 // Node within the linked list queue of messages to send
 struct Node<T> {
     // FIXME: this could be an uninitialized T if we're careful enough, and
     //      that would reduce memory usage (and be a bit faster).
     //      is it worth it?
     value: Option<T>,           // nullable for re-use of nodes
+    cached: bool,               // This node goes into the node cache
     next: AtomicPtr<Node<T>>,   // next node in the queue
 }
 
@@ -35,38 +38,55 @@ struct Node<T> {
 /// but it can be safely shared in an Arc if it is guaranteed that there
 /// is only one popper and one pusher touching the queue at any one point in
 /// time.
-pub struct Queue<T> {
+pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
     // consumer fields
+    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
+
+    // producer fields
+    producer: CacheAligned<Producer<T, ProducerAddition>>,
+}
+
+struct Consumer<T, Addition> {
     tail: UnsafeCell<*mut Node<T>>, // where to pop from
     tail_prev: AtomicPtr<Node<T>>, // where to pop from
+    cache_bound: usize, // maximum cache size
+    cached_nodes: AtomicUsize, // number of nodes marked as cachable
+    addition: Addition,
+}
 
-    // producer fields
+struct Producer<T, Addition> {
     head: UnsafeCell<*mut Node<T>>,      // where to push to
     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-
-    // Cache maintenance fields. Additions and subtractions are stored
-    // separately in order to allow them to use nonatomic addition/subtraction.
-    cache_bound: usize,
-    cache_additions: AtomicUsize,
-    cache_subtractions: AtomicUsize,
+    addition: Addition,
 }
 
-unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
 
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
 
 impl<T> Node<T> {
     fn new() -> *mut Node<T> {
         Box::into_raw(box Node {
             value: None,
+            cached: false,
             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
         })
     }
 }
 
-impl<T> Queue<T> {
-    /// Creates a new queue.
+impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
+
+    /// Creates a new queue. With given additional elements in the producer and
+    /// consumer portions of the queue.
+    ///
+    /// Due to the performance implications of cache-contention,
+    /// we wish to keep fields used mainly by the producer on a separate cache
+    /// line than those used by the consumer.
+    /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
+    /// allocate one for small fields, so we allow users to insert additional
+    /// fields into the cache lines already allocated by this for the producer
+    /// and consumer.
     ///
     /// This is unsafe as the type system doesn't enforce a single
     /// consumer-producer relationship. It also allows the consumer to `pop`
@@ -83,19 +103,28 @@ impl<T> Queue<T> {
     ///               cache (if desired). If the value is 0, then the cache has
     ///               no bound. Otherwise, the cache will never grow larger than
     ///               `bound` (although the queue itself could be much larger.
-    pub unsafe fn new(bound: usize) -> Queue<T> {
+    pub unsafe fn with_additions(
+        bound: usize,
+        producer_addition: ProducerAddition,
+        consumer_addition: ConsumerAddition,
+    ) -> Self {
         let n1 = Node::new();
         let n2 = Node::new();
         (*n1).next.store(n2, Ordering::Relaxed);
         Queue {
-            tail: UnsafeCell::new(n2),
-            tail_prev: AtomicPtr::new(n1),
-            head: UnsafeCell::new(n2),
-            first: UnsafeCell::new(n1),
-            tail_copy: UnsafeCell::new(n1),
-            cache_bound: bound,
-            cache_additions: AtomicUsize::new(0),
-            cache_subtractions: AtomicUsize::new(0),
+            consumer: CacheAligned::new(Consumer {
+                tail: UnsafeCell::new(n2),
+                tail_prev: AtomicPtr::new(n1),
+                cache_bound: bound,
+                cached_nodes: AtomicUsize::new(0),
+                addition: consumer_addition
+            }),
+            producer: CacheAligned::new(Producer {
+                head: UnsafeCell::new(n2),
+                first: UnsafeCell::new(n1),
+                tail_copy: UnsafeCell::new(n1),
+                addition: producer_addition
+            }),
         }
     }
 
@@ -109,35 +138,25 @@ impl<T> Queue<T> {
             assert!((*n).value.is_none());
             (*n).value = Some(t);
             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
-            (**self.head.get()).next.store(n, Ordering::Release);
-            *self.head.get() = n;
+            (**self.producer.head.get()).next.store(n, Ordering::Release);
+            *(&self.producer.head).get() = n;
         }
     }
 
     unsafe fn alloc(&self) -> *mut Node<T> {
         // First try to see if we can consume the 'first' node for our uses.
-        // We try to avoid as many atomic instructions as possible here, so
-        // the addition to cache_subtractions is not atomic (plus we're the
-        // only one subtracting from the cache).
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If the above fails, then update our copy of the tail and try
         // again.
-        *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Ordering::Relaxed);
-                self.cache_subtractions.store(b + 1, Ordering::Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+        *self.producer.0.tail_copy.get() =
+            self.consumer.tail_prev.load(Ordering::Acquire);
+        if *self.producer.first.get() != *self.producer.tail_copy.get() {
+            let ret = *self.producer.first.get();
+            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
             return ret;
         }
         // If all of that fails, then we have to allocate a new node
@@ -153,27 +172,27 @@ impl<T> Queue<T> {
             // sentinel from where we should start popping from. Hence, look at
             // tail's next field and see if we can use it. If we do a pop, then
             // the current tail node is a candidate for going into the cache.
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { return None }
             assert!((*next).value.is_some());
             let ret = (*next).value.take();
 
-            *self.tail.get() = next;
-            if self.cache_bound == 0 {
-                self.tail_prev.store(tail, Ordering::Release);
+            *self.consumer.0.tail.get() = next;
+            if self.consumer.cache_bound == 0 {
+                self.consumer.tail_prev.store(tail, Ordering::Release);
             } else {
-                // FIXME: this is dubious with overflow.
-                let additions = self.cache_additions.load(Ordering::Relaxed);
-                let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
-                let size = additions - subtractions;
-
-                if size < self.cache_bound {
-                    self.tail_prev.store(tail, Ordering::Release);
-                    self.cache_additions.store(additions + 1, Ordering::Relaxed);
+                let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
+                if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
+                    self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
+                    (*tail).cached = true;
+                }
+
+                if (*tail).cached {
+                    self.consumer.tail_prev.store(tail, Ordering::Release);
                 } else {
-                    (*self.tail_prev.load(Ordering::Relaxed))
-                          .next.store(next, Ordering::Relaxed);
+                    (*self.consumer.tail_prev.load(Ordering::Relaxed))
+                        .next.store(next, Ordering::Relaxed);
                     // We have successfully erased all references to 'tail', so
                     // now we can safely drop it.
                     let _: Box<Node<T>> = Box::from_raw(tail);
@@ -194,17 +213,25 @@ impl<T> Queue<T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
         unsafe {
-            let tail = *self.tail.get();
+            let tail = *self.consumer.tail.get();
             let next = (*tail).next.load(Ordering::Acquire);
             if next.is_null() { None } else { (*next).value.as_mut() }
         }
     }
+
+    pub fn producer_addition(&self) -> &ProducerAddition {
+        &self.producer.addition
+    }
+
+    pub fn consumer_addition(&self) -> &ConsumerAddition {
+        &self.consumer.addition
+    }
 }
 
-impl<T> Drop for Queue<T> {
+impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
     fn drop(&mut self) {
         unsafe {
-            let mut cur = *self.first.get();
+            let mut cur = *self.producer.first.get();
             while !cur.is_null() {
                 let next = (*cur).next.load(Ordering::Relaxed);
                 let _n: Box<Node<T>> = Box::from_raw(cur);
@@ -224,7 +251,7 @@ mod tests {
     #[test]
     fn smoke() {
         unsafe {
-            let queue = Queue::new(0);
+            let queue = Queue::with_additions(0, (), ());
             queue.push(1);
             queue.push(2);
             assert_eq!(queue.pop(), Some(1));
@@ -241,7 +268,7 @@ mod tests {
     #[test]
     fn peek() {
         unsafe {
-            let queue = Queue::new(0);
+            let queue = Queue::with_additions(0, (), ());
             queue.push(vec![1]);
 
             // Ensure the borrowchecker works
@@ -264,7 +291,7 @@ mod tests {
     #[test]
     fn drop_full() {
         unsafe {
-            let q: Queue<Box<_>> = Queue::new(0);
+            let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
             q.push(box 1);
             q.push(box 2);
         }
@@ -273,7 +300,7 @@ mod tests {
     #[test]
     fn smoke_bound() {
         unsafe {
-            let q = Queue::new(0);
+            let q = Queue::with_additions(0, (), ());
             q.push(1);
             q.push(2);
             assert_eq!(q.pop(), Some(1));
@@ -295,7 +322,7 @@ mod tests {
         }
 
         unsafe fn stress_bound(bound: usize) {
-            let q = Arc::new(Queue::new(bound));
+            let q = Arc::new(Queue::with_additions(bound, (), ()));
 
             let (tx, rx) = channel();
             let q2 = q.clone();
diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs
index 47cd8977fda..d1515eba68c 100644
--- a/src/libstd/sync/mpsc/stream.rs
+++ b/src/libstd/sync/mpsc/stream.rs
@@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5;
 const MAX_STEALS: isize = 1 << 20;
 
 pub struct Packet<T> {
-    queue: spsc::Queue<Message<T>>, // internal queue for all message
+    // internal queue for all messages
+    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
+}
 
+struct ProducerAddition {
     cnt: AtomicIsize, // How many items are on this channel
-    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
     to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
 
     port_dropped: AtomicBool, // flag if the channel has been destroyed.
 }
 
+struct ConsumerAddition {
+    steals: UnsafeCell<isize>,  // How many times has a port received without blocking?
+}
+
+
 pub enum Failure<T> {
     Empty,
     Disconnected,
@@ -78,13 +85,18 @@ enum Message<T> {
 impl<T> Packet<T> {
     pub fn new() -> Packet<T> {
         Packet {
-            queue: unsafe { spsc::Queue::new(128) },
-
-            cnt: AtomicIsize::new(0),
-            steals: UnsafeCell::new(0),
-            to_wake: AtomicUsize::new(0),
-
-            port_dropped: AtomicBool::new(false),
+            queue: unsafe { spsc::Queue::with_additions(
+                128,
+                ProducerAddition {
+                    cnt: AtomicIsize::new(0),
+                    to_wake: AtomicUsize::new(0),
+
+                    port_dropped: AtomicBool::new(false),
+                },
+                ConsumerAddition {
+                    steals: UnsafeCell::new(0),
+                }
+            )},
         }
     }
 
@@ -92,7 +104,7 @@ impl<T> Packet<T> {
         // If the other port has deterministically gone away, then definitely
         // must return the data back up the stack. Otherwise, the data is
         // considered as being sent.
-        if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
 
         match self.do_send(Data(t)) {
             UpSuccess | UpDisconnected => {},
@@ -104,14 +116,16 @@ impl<T> Packet<T> {
     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
         // If the port has gone away, then there's no need to proceed any
         // further.
-        if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
+            return UpDisconnected
+        }
 
         self.do_send(GoUp(up))
     }
 
     fn do_send(&self, t: Message<T>) -> UpgradeResult {
         self.queue.push(t);
-        match self.cnt.fetch_add(1, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
             // As described in the mod's doc comment, -1 == wakeup
             -1 => UpWoke(self.take_to_wake()),
             // As as described before, SPSC queues must be >= -2
@@ -125,7 +139,7 @@ impl<T> Packet<T> {
             // will never remove this data. We can only have at most one item to
             // drain (the port drains the rest).
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 let first = self.queue.pop();
                 let second = self.queue.pop();
                 assert!(second.is_none());
@@ -144,8 +158,8 @@ impl<T> Packet<T> {
 
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
-        let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(0, Ordering::SeqCst);
+        let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         assert!(ptr != 0);
         unsafe { SignalToken::cast_from_usize(ptr) }
     }
@@ -154,14 +168,16 @@ impl<T> Packet<T> {
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
     fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
         let ptr = unsafe { token.cast_to_usize() };
-        self.to_wake.store(ptr, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
 
-        let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+        let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
 
-        match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+        match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+            DISCONNECTED => {
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
+            }
             // If we factor in our steals and notice that the channel has no
             // data, we successfully sleep
             n => {
@@ -170,7 +186,7 @@ impl<T> Packet<T> {
             }
         }
 
-        self.to_wake.store(0, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         Err(unsafe { SignalToken::cast_from_usize(ptr) })
     }
 
@@ -201,7 +217,7 @@ impl<T> Packet<T> {
             // "steal" factored into the channel count above).
             data @ Ok(..) |
             data @ Err(Upgraded(..)) => unsafe {
-                *self.steals.get() -= 1;
+                *self.queue.consumer_addition().steals.get() -= 1;
                 data
             },
 
@@ -223,20 +239,21 @@ impl<T> Packet<T> {
             // down as much as possible (without going negative), and then
             // adding back in whatever we couldn't factor into steals.
             Some(data) => unsafe {
-                if *self.steals.get() > MAX_STEALS {
-                    match self.cnt.swap(0, Ordering::SeqCst) {
+                if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
+                    match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
                         DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                            self.queue.producer_addition().cnt.store(
+                                DISCONNECTED, Ordering::SeqCst);
                         }
                         n => {
-                            let m = cmp::min(n, *self.steals.get());
-                            *self.steals.get() -= m;
+                            let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
+                            *self.queue.consumer_addition().steals.get() -= m;
                             self.bump(n - m);
                         }
                     }
-                    assert!(*self.steals.get() >= 0);
+                    assert!(*self.queue.consumer_addition().steals.get() >= 0);
                 }
-                *self.steals.get() += 1;
+                *self.queue.consumer_addition().steals.get() += 1;
                 match data {
                     Data(t) => Ok(t),
                     GoUp(up) => Err(Upgraded(up)),
@@ -244,7 +261,7 @@ impl<T> Packet<T> {
             },
 
             None => {
-                match self.cnt.load(Ordering::SeqCst) {
+                match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
                     n if n != DISCONNECTED => Err(Empty),
 
                     // This is a little bit of a tricky case. We failed to pop
@@ -273,7 +290,7 @@ impl<T> Packet<T> {
     pub fn drop_chan(&self) {
         // Dropping a channel is pretty simple, we just flag it as disconnected
         // and then wakeup a blocker if there is one.
-        match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
             -1 => { self.take_to_wake().signal(); }
             DISCONNECTED => {}
             n => { assert!(n >= 0); }
@@ -300,7 +317,7 @@ impl<T> Packet<T> {
         // sends are gated on this flag, so we're immediately guaranteed that
         // there are a bounded number of active sends that we'll have to deal
         // with.
-        self.port_dropped.store(true, Ordering::SeqCst);
+        self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
 
         // Now that we're guaranteed to deal with a bounded number of senders,
         // we need to drain the queue. This draining process happens atomically
@@ -310,9 +327,9 @@ impl<T> Packet<T> {
         // continue to fail while active senders send data while we're dropping
         // data, but eventually we're guaranteed to break out of this loop
         // (because there is a bounded number of senders).
-        let mut steals = unsafe { *self.steals.get() };
+        let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
         while {
-            let cnt = self.cnt.compare_and_swap(
+            let cnt = self.queue.producer_addition().cnt.compare_and_swap(
                             steals, DISCONNECTED, Ordering::SeqCst);
             cnt != DISCONNECTED && cnt != steals
         } {
@@ -353,9 +370,9 @@ impl<T> Packet<T> {
 
     // increment the count on the channel (used for selection)
     fn bump(&self, amt: isize) -> isize {
-        match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 DISCONNECTED
             }
             n => n
@@ -404,8 +421,8 @@ impl<T> Packet<T> {
         // this end. This is fine because we know it's a small bounded windows
         // of time until the data is actually sent.
         if was_upgrade {
-            assert_eq!(unsafe { *self.steals.get() }, 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             return Ok(true)
         }
 
@@ -418,7 +435,7 @@ impl<T> Packet<T> {
         // If we were previously disconnected, then we know for sure that there
         // is no thread in to_wake, so just keep going
         let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             true // there is data, that data is that we're disconnected
         } else {
             let cur = prev + steals + 1;
@@ -441,13 +458,13 @@ impl<T> Packet<T> {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.to_wake.load(Ordering::SeqCst) != 0 {
+                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
                     thread::yield_now();
                 }
             }
             unsafe {
-                assert_eq!(*self.steals.get(), 0);
-                *self.steals.get() = steals;
+                assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
+                *self.queue.consumer_addition().steals.get() = steals;
             }
 
             // if we were previously positive, then there's surely data to
@@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> {
         // disconnection, but also a proper fence before the read of
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
-        assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
     }
 }
diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs
index 1d62853e906..eb507858b92 100644
--- a/src/libstd/sync/mutex.rs
+++ b/src/libstd/sync/mutex.rs
@@ -394,11 +394,18 @@ impl<T: ?Sized + Default> Default for Mutex<T> {
 impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match self.try_lock() {
-            Ok(guard) => write!(f, "Mutex {{ data: {:?} }}", &*guard),
+            Ok(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
             Err(TryLockError::Poisoned(err)) => {
-                write!(f, "Mutex {{ data: Poisoned({:?}) }}", &**err.get_ref())
+                f.debug_struct("Mutex").field("data", &&**err.get_ref()).finish()
             },
-            Err(TryLockError::WouldBlock) => write!(f, "Mutex {{ <locked> }}")
+            Err(TryLockError::WouldBlock) => {
+                struct LockedPlaceholder;
+                impl fmt::Debug for LockedPlaceholder {
+                    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") }
+                }
+
+                f.debug_struct("Mutex").field("data", &LockedPlaceholder).finish()
+            }
         }
     }
 }
diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs
index 015106fc2e5..30dbf02087d 100644
--- a/src/libstd/sync/once.rs
+++ b/src/libstd/sync/once.rs
@@ -103,8 +103,8 @@ unsafe impl Sync for Once {}
 #[stable(feature = "rust1", since = "1.0.0")]
 unsafe impl Send for Once {}
 
-/// State yielded to the [`call_once_force`] method which can be used to query
-/// whether the [`Once`] was previously poisoned or not.
+/// State yielded to [`call_once_force`]’s closure parameter. The state can be
+/// used to query the poison status of the [`Once`].
 ///
 /// [`call_once_force`]: struct.Once.html#method.call_once_force
 /// [`Once`]: struct.Once.html
@@ -230,17 +230,50 @@ impl Once {
 
     /// Performs the same function as [`call_once`] except ignores poisoning.
     ///
+    /// Unlike [`call_once`], if this `Once` has been poisoned (i.e. a previous
+    /// call to `call_once` or `call_once_force` caused a panic), calling
+    /// `call_once_force` will still invoke the closure `f` and will _not_
+    /// result in an immediate panic. If `f` panics, the `Once` will remain
+    /// in a poison state. If `f` does _not_ panic, the `Once` will no
+    /// longer be in a poison state and all future calls to `call_once` or
+    /// `call_one_force` will no-op.
+    ///
+    /// The closure `f` is yielded a [`OnceState`] structure which can be used
+    /// to query the poison status of the `Once`.
+    ///
     /// [`call_once`]: struct.Once.html#method.call_once
+    /// [`OnceState`]: struct.OnceState.html
     ///
-    /// If this `Once` has been poisoned (some initialization panicked) then
-    /// this function will continue to attempt to call initialization functions
-    /// until one of them doesn't panic.
+    /// # Examples
     ///
-    /// The closure `f` is yielded a [`OnceState`] structure which can be used to query the
-    /// state of this `Once` (whether initialization has previously panicked or
-    /// not).
+    /// ```
+    /// #![feature(once_poison)]
     ///
-    /// [`OnceState`]: struct.OnceState.html
+    /// use std::sync::{Once, ONCE_INIT};
+    /// use std::thread;
+    ///
+    /// static INIT: Once = ONCE_INIT;
+    ///
+    /// // poison the once
+    /// let handle = thread::spawn(|| {
+    ///     INIT.call_once(|| panic!());
+    /// });
+    /// assert!(handle.join().is_err());
+    ///
+    /// // poisoning propagates
+    /// let handle = thread::spawn(|| {
+    ///     INIT.call_once(|| {});
+    /// });
+    /// assert!(handle.join().is_err());
+    ///
+    /// // call_once_force will still run and reset the poisoned state
+    /// INIT.call_once_force(|state| {
+    ///     assert!(state.poisoned());
+    /// });
+    ///
+    /// // once any success happens, we stop propagating the poison
+    /// INIT.call_once(|| {});
+    /// ```
     #[unstable(feature = "once_poison", issue = "33577")]
     pub fn call_once_force<F>(&'static self, f: F) where F: FnOnce(&OnceState) {
         // same as above, just with a different parameter to `call_inner`.
@@ -386,12 +419,47 @@ impl Drop for Finish {
 }
 
 impl OnceState {
-    /// Returns whether the associated [`Once`] has been poisoned.
-    ///
-    /// Once an initialization routine for a [`Once`] has panicked it will forever
-    /// indicate to future forced initialization routines that it is poisoned.
+    /// Returns whether the associated [`Once`] was poisoned prior to the
+    /// invocation of the closure passed to [`call_once_force`].
     ///
+    /// [`call_once_force`]: struct.Once.html#method.call_once_force
     /// [`Once`]: struct.Once.html
+    ///
+    /// # Examples
+    ///
+    /// A poisoned `Once`:
+    ///
+    /// ```
+    /// #![feature(once_poison)]
+    ///
+    /// use std::sync::{Once, ONCE_INIT};
+    /// use std::thread;
+    ///
+    /// static INIT: Once = ONCE_INIT;
+    ///
+    /// // poison the once
+    /// let handle = thread::spawn(|| {
+    ///     INIT.call_once(|| panic!());
+    /// });
+    /// assert!(handle.join().is_err());
+    ///
+    /// INIT.call_once_force(|state| {
+    ///     assert!(state.poisoned());
+    /// });
+    /// ```
+    ///
+    /// An unpoisoned `Once`:
+    ///
+    /// ```
+    /// #![feature(once_poison)]
+    ///
+    /// use std::sync::{Once, ONCE_INIT};
+    ///
+    /// static INIT: Once = ONCE_INIT;
+    ///
+    /// INIT.call_once_force(|state| {
+    ///     assert!(!state.poisoned());
+    /// });
     #[unstable(feature = "once_poison", issue = "33577")]
     pub fn poisoned(&self) -> bool {
         self.poisoned
diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs
index 4757faabfb8..5c49d6b5845 100644
--- a/src/libstd/sync/rwlock.rs
+++ b/src/libstd/sync/rwlock.rs
@@ -428,11 +428,18 @@ unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> {
 impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLock<T> {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match self.try_read() {
-            Ok(guard) => write!(f, "RwLock {{ data: {:?} }}", &*guard),
+            Ok(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(),
             Err(TryLockError::Poisoned(err)) => {
-                write!(f, "RwLock {{ data: Poisoned({:?}) }}", &**err.get_ref())
+                f.debug_struct("RwLock").field("data", &&**err.get_ref()).finish()
             },
-            Err(TryLockError::WouldBlock) => write!(f, "RwLock {{ <locked> }}")
+            Err(TryLockError::WouldBlock) => {
+                struct LockedPlaceholder;
+                impl fmt::Debug for LockedPlaceholder {
+                    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") }
+                }
+
+                f.debug_struct("RwLock").field("data", &LockedPlaceholder).finish()
+            }
         }
     }
 }