about summary refs log tree commit diff
path: root/src/libstd/comm/shared.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/comm/shared.rs')
-rw-r--r--src/libstd/comm/shared.rs18
1 files changed, 14 insertions, 4 deletions
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
index 30e061bb7b9..77bf2d7a68d 100644
--- a/src/libstd/comm/shared.rs
+++ b/src/libstd/comm/shared.rs
@@ -398,6 +398,17 @@ impl<T: Send> Packet<T> {
         cnt == DISCONNECTED || cnt - self.steals > 0
     }
 
+    // increment the count on the channel (used for selection)
+    fn bump(&mut self, amt: int) -> int {
+        match self.cnt.fetch_add(amt, atomics::SeqCst) {
+            DISCONNECTED => {
+                self.cnt.store(DISCONNECTED, atomics::SeqCst);
+                DISCONNECTED
+            }
+            n => n
+        }
+    }
+
     // Inserts the blocked task for selection on this port, returning it back if
     // the port already has data on it.
     //
@@ -408,8 +419,8 @@ impl<T: Send> Packet<T> {
         match self.decrement(task) {
             Ok(()) => Ok(()),
             Err(task) => {
-                let prev = self.cnt.fetch_add(1, atomics::SeqCst);
-                assert!(prev >= 0);
+                let prev = self.bump(1);
+                assert!(prev == DISCONNECTED || prev >= 0);
                 return Err(task);
             }
         }
@@ -440,11 +451,10 @@ impl<T: Send> Packet<T> {
             let cnt = self.cnt.load(atomics::SeqCst);
             if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
         };
-        let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+        let prev = self.bump(steals + 1);
 
         if prev == DISCONNECTED {
             assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
-            self.cnt.store(DISCONNECTED, atomics::SeqCst);
             true
         } else {
             let cur = prev + steals + 1;