about summary refs log tree commit diff
path: root/src/libstd/sync/mpsc/stream.rs
diff options
context:
space:
mode:
authorJoshua Lockerman <j@Js-MacBook-Air.home>2017-09-29 15:58:11 -0400
committerJoshua Lockerman <j@Js-MacBook-Air.home>2017-10-01 12:15:35 -0400
commit68341a91eecb830c57779cd2423733a7395258ab (patch)
tree851726d9a606ab4ffd49b719f741dd856e09dfd1 /src/libstd/sync/mpsc/stream.rs
parent0e6f4cf51cd3b799fb057956f8e733d16605d09b (diff)
downloadrust-68341a91eecb830c57779cd2423733a7395258ab.tar.gz
rust-68341a91eecb830c57779cd2423733a7395258ab.zip
Improve performance of spsc_queue and stream.
This commit makes two main changes.
1. It switches the spsc_queue node caching strategy from keeping a shared
counter of the number of nodes in the cache to keeping a consumer only counter
of the number of node eligible to be cached.
2. It separate the consumer and producers fields of spsc_queue and stream into
a producer cache line and consumer cache line.
Diffstat (limited to 'src/libstd/sync/mpsc/stream.rs')
-rw-r--r--src/libstd/sync/mpsc/stream.rs105
1 files changed, 61 insertions, 44 deletions
diff --git a/src/libstd/sync/mpsc/stream.rs b/src/libstd/sync/mpsc/stream.rs
index 47cd8977fda..d1515eba68c 100644
--- a/src/libstd/sync/mpsc/stream.rs
+++ b/src/libstd/sync/mpsc/stream.rs
@@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5;
 const MAX_STEALS: isize = 1 << 20;
 
 pub struct Packet<T> {
-    queue: spsc::Queue<Message<T>>, // internal queue for all message
+    // internal queue for all messages
+    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
+}
 
+struct ProducerAddition {
     cnt: AtomicIsize, // How many items are on this channel
-    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
     to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
 
     port_dropped: AtomicBool, // flag if the channel has been destroyed.
 }
 
+struct ConsumerAddition {
+    steals: UnsafeCell<isize>,  // How many times has a port received without blocking?
+}
+
+
 pub enum Failure<T> {
     Empty,
     Disconnected,
@@ -78,13 +85,18 @@ enum Message<T> {
 impl<T> Packet<T> {
     pub fn new() -> Packet<T> {
         Packet {
-            queue: unsafe { spsc::Queue::new(128) },
-
-            cnt: AtomicIsize::new(0),
-            steals: UnsafeCell::new(0),
-            to_wake: AtomicUsize::new(0),
-
-            port_dropped: AtomicBool::new(false),
+            queue: unsafe { spsc::Queue::with_additions(
+                128,
+                ProducerAddition {
+                    cnt: AtomicIsize::new(0),
+                    to_wake: AtomicUsize::new(0),
+
+                    port_dropped: AtomicBool::new(false),
+                },
+                ConsumerAddition {
+                    steals: UnsafeCell::new(0),
+                }
+            )},
         }
     }
 
@@ -92,7 +104,7 @@ impl<T> Packet<T> {
         // If the other port has deterministically gone away, then definitely
         // must return the data back up the stack. Otherwise, the data is
         // considered as being sent.
-        if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
 
         match self.do_send(Data(t)) {
             UpSuccess | UpDisconnected => {},
@@ -104,14 +116,16 @@ impl<T> Packet<T> {
     pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
         // If the port has gone away, then there's no need to proceed any
         // further.
-        if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
+            return UpDisconnected
+        }
 
         self.do_send(GoUp(up))
     }
 
     fn do_send(&self, t: Message<T>) -> UpgradeResult {
         self.queue.push(t);
-        match self.cnt.fetch_add(1, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
             // As described in the mod's doc comment, -1 == wakeup
             -1 => UpWoke(self.take_to_wake()),
             // As as described before, SPSC queues must be >= -2
@@ -125,7 +139,7 @@ impl<T> Packet<T> {
             // will never remove this data. We can only have at most one item to
             // drain (the port drains the rest).
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 let first = self.queue.pop();
                 let second = self.queue.pop();
                 assert!(second.is_none());
@@ -144,8 +158,8 @@ impl<T> Packet<T> {
 
     // Consumes ownership of the 'to_wake' field.
     fn take_to_wake(&self) -> SignalToken {
-        let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(0, Ordering::SeqCst);
+        let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         assert!(ptr != 0);
         unsafe { SignalToken::cast_from_usize(ptr) }
     }
@@ -154,14 +168,16 @@ impl<T> Packet<T> {
     // back if it shouldn't sleep. Note that this is the location where we take
     // steals into account.
     fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
         let ptr = unsafe { token.cast_to_usize() };
-        self.to_wake.store(ptr, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
 
-        let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+        let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
 
-        match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+        match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+            DISCONNECTED => {
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
+            }
             // If we factor in our steals and notice that the channel has no
             // data, we successfully sleep
             n => {
@@ -170,7 +186,7 @@ impl<T> Packet<T> {
             }
         }
 
-        self.to_wake.store(0, Ordering::SeqCst);
+        self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
         Err(unsafe { SignalToken::cast_from_usize(ptr) })
     }
 
@@ -201,7 +217,7 @@ impl<T> Packet<T> {
             // "steal" factored into the channel count above).
             data @ Ok(..) |
             data @ Err(Upgraded(..)) => unsafe {
-                *self.steals.get() -= 1;
+                *self.queue.consumer_addition().steals.get() -= 1;
                 data
             },
 
@@ -223,20 +239,21 @@ impl<T> Packet<T> {
             // down as much as possible (without going negative), and then
             // adding back in whatever we couldn't factor into steals.
             Some(data) => unsafe {
-                if *self.steals.get() > MAX_STEALS {
-                    match self.cnt.swap(0, Ordering::SeqCst) {
+                if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
+                    match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
                         DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                            self.queue.producer_addition().cnt.store(
+                                DISCONNECTED, Ordering::SeqCst);
                         }
                         n => {
-                            let m = cmp::min(n, *self.steals.get());
-                            *self.steals.get() -= m;
+                            let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
+                            *self.queue.consumer_addition().steals.get() -= m;
                             self.bump(n - m);
                         }
                     }
-                    assert!(*self.steals.get() >= 0);
+                    assert!(*self.queue.consumer_addition().steals.get() >= 0);
                 }
-                *self.steals.get() += 1;
+                *self.queue.consumer_addition().steals.get() += 1;
                 match data {
                     Data(t) => Ok(t),
                     GoUp(up) => Err(Upgraded(up)),
@@ -244,7 +261,7 @@ impl<T> Packet<T> {
             },
 
             None => {
-                match self.cnt.load(Ordering::SeqCst) {
+                match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
                     n if n != DISCONNECTED => Err(Empty),
 
                     // This is a little bit of a tricky case. We failed to pop
@@ -273,7 +290,7 @@ impl<T> Packet<T> {
     pub fn drop_chan(&self) {
         // Dropping a channel is pretty simple, we just flag it as disconnected
         // and then wakeup a blocker if there is one.
-        match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
             -1 => { self.take_to_wake().signal(); }
             DISCONNECTED => {}
             n => { assert!(n >= 0); }
@@ -300,7 +317,7 @@ impl<T> Packet<T> {
         // sends are gated on this flag, so we're immediately guaranteed that
         // there are a bounded number of active sends that we'll have to deal
         // with.
-        self.port_dropped.store(true, Ordering::SeqCst);
+        self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
 
         // Now that we're guaranteed to deal with a bounded number of senders,
         // we need to drain the queue. This draining process happens atomically
@@ -310,9 +327,9 @@ impl<T> Packet<T> {
         // continue to fail while active senders send data while we're dropping
         // data, but eventually we're guaranteed to break out of this loop
         // (because there is a bounded number of senders).
-        let mut steals = unsafe { *self.steals.get() };
+        let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
         while {
-            let cnt = self.cnt.compare_and_swap(
+            let cnt = self.queue.producer_addition().cnt.compare_and_swap(
                             steals, DISCONNECTED, Ordering::SeqCst);
             cnt != DISCONNECTED && cnt != steals
         } {
@@ -353,9 +370,9 @@ impl<T> Packet<T> {
 
     // increment the count on the channel (used for selection)
     fn bump(&self, amt: isize) -> isize {
-        match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+        match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
             DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
                 DISCONNECTED
             }
             n => n
@@ -404,8 +421,8 @@ impl<T> Packet<T> {
         // this end. This is fine because we know it's a small bounded windows
         // of time until the data is actually sent.
         if was_upgrade {
-            assert_eq!(unsafe { *self.steals.get() }, 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             return Ok(true)
         }
 
@@ -418,7 +435,7 @@ impl<T> Packet<T> {
         // If we were previously disconnected, then we know for sure that there
         // is no thread in to_wake, so just keep going
         let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
             true // there is data, that data is that we're disconnected
         } else {
             let cur = prev + steals + 1;
@@ -441,13 +458,13 @@ impl<T> Packet<T> {
             if prev < 0 {
                 drop(self.take_to_wake());
             } else {
-                while self.to_wake.load(Ordering::SeqCst) != 0 {
+                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
                     thread::yield_now();
                 }
             }
             unsafe {
-                assert_eq!(*self.steals.get(), 0);
-                *self.steals.get() = steals;
+                assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
+                *self.queue.consumer_addition().steals.get() = steals;
             }
 
             // if we were previously positive, then there's surely data to
@@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> {
         // disconnection, but also a proper fence before the read of
         // `to_wake`, so this assert cannot be removed with also removing
         // the `to_wake` assert.
-        assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+        assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
+        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
     }
 }