about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBen Blum <bblum@andrew.cmu.edu>2013-06-10 22:13:56 -0400
committerBen Blum <bblum@andrew.cmu.edu>2013-06-13 14:45:14 -0400
commit57cb44dbeb0856e3f1b2a58b0b381fc37c65c53c (patch)
tree59edca01f4c0494bddc15651bf6717ddd4ae1a47
parent68e8fe9b6e8cd1abf88822c12738a8cd8fc950a5 (diff)
downloadrust-57cb44dbeb0856e3f1b2a58b0b381fc37c65c53c.tar.gz
rust-57cb44dbeb0856e3f1b2a58b0b381fc37c65c53c.zip
Change sync::RWlock implementation to use atomic uint instead of exclusive, for performance. Close #7066.
-rw-r--r--src/libextra/sync.rs137
1 files changed, 80 insertions, 57 deletions
diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs
index 39577d863b8..86cc014f714 100644
--- a/src/libextra/sync.rs
+++ b/src/libextra/sync.rs
@@ -20,7 +20,8 @@ use core::prelude::*;
 use core::borrow;
 use core::comm;
 use core::task;
-use core::unstable::sync::{Exclusive, exclusive};
+use core::unstable::sync::{Exclusive, exclusive, UnsafeAtomicRcBox};
+use core::unstable::atomics;
 use core::util;
 
 /****************************************************************************
@@ -37,6 +38,7 @@ type SignalEnd = comm::ChanOne<()>;
 struct Waitqueue { head: comm::Port<SignalEnd>,
                    tail: comm::Chan<SignalEnd> }
 
+#[doc(hidden)]
 fn new_waitqueue() -> Waitqueue {
     let (block_head, block_tail) = comm::stream();
     Waitqueue { head: block_head, tail: block_tail }
@@ -166,9 +168,12 @@ impl Sem<~[Waitqueue]> {
 // FIXME(#3588) should go inside of access()
 #[doc(hidden)]
 type SemRelease<'self> = SemReleaseGeneric<'self, ()>;
+#[doc(hidden)]
 type SemAndSignalRelease<'self> = SemReleaseGeneric<'self, ~[Waitqueue]>;
+#[doc(hidden)]
 struct SemReleaseGeneric<'self, Q> { sem: &'self Sem<Q> }
 
+#[doc(hidden)]
 #[unsafe_destructor]
 impl<'self, Q:Owned> Drop for SemReleaseGeneric<'self, Q> {
     fn finalize(&self) {
@@ -176,12 +181,14 @@ impl<'self, Q:Owned> Drop for SemReleaseGeneric<'self, Q> {
     }
 }
 
+#[doc(hidden)]
 fn SemRelease<'r>(sem: &'r Sem<()>) -> SemRelease<'r> {
     SemReleaseGeneric {
         sem: sem
     }
 }
 
+#[doc(hidden)]
 fn SemAndSignalRelease<'r>(sem: &'r Sem<~[Waitqueue]>)
                         -> SemAndSignalRelease<'r> {
     SemReleaseGeneric {
@@ -465,8 +472,23 @@ impl Mutex {
 
 #[doc(hidden)]
 struct RWlockInner {
+    // You might ask, "Why don't you need to use an atomic for the mode flag?"
+    // This flag affects the behaviour of readers (for plain readers, they
+    // assert on it; for downgraders, they use it to decide which mode to
+    // unlock for). Consider that the flag is only unset when the very last
+    // reader exits; therefore, it can never be unset during a reader/reader
+    // (or reader/downgrader) race.
+    // By the way, if we didn't care about the assert in the read unlock path,
+    // we could instead store the mode flag in write_downgrade's stack frame,
+    // and have the downgrade tokens store a borrowed pointer to it.
     read_mode:  bool,
-    read_count: uint
+    // The only way the count flag is ever accessed is with xadd. Since it is
+    // a read-modify-write operation, multiple xadds on different cores will
+    // always be consistent with respect to each other, so a monotonic/relaxed
+    // consistency ordering suffices (i.e., no extra barriers are needed).
+    // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
+    // acquire/release orderings superfluously. Change these someday.
+    read_count: atomics::AtomicUint,
 }
 
 /**
@@ -479,7 +501,7 @@ struct RWlockInner {
 pub struct RWlock {
     priv order_lock:  Semaphore,
     priv access_lock: Sem<~[Waitqueue]>,
-    priv state:       Exclusive<RWlockInner>
+    priv state:       UnsafeAtomicRcBox<RWlockInner>,
 }
 
 /// Create a new rwlock, with one associated condvar.
@@ -490,10 +512,13 @@ pub fn RWlock() -> RWlock { rwlock_with_condvars(1) }
  * Similar to mutex_with_condvars.
  */
 pub fn rwlock_with_condvars(num_condvars: uint) -> RWlock {
-    RWlock { order_lock: semaphore(1),
+    let state = UnsafeAtomicRcBox::new(RWlockInner {
+        read_mode:  false,
+        read_count: atomics::AtomicUint::new(0),
+    });
+    RWlock { order_lock:  semaphore(1),
              access_lock: new_sem_and_signal(1, num_condvars),
-             state: exclusive(RWlockInner { read_mode:  false,
-                                             read_count: 0 }) }
+             state:       state, }
 }
 
 impl RWlock {
@@ -513,20 +538,11 @@ impl RWlock {
         unsafe {
             do task::unkillable {
                 do (&self.order_lock).access {
-                    let mut first_reader = false;
-                    do self.state.with |state| {
-                        first_reader = (state.read_count == 0);
-                        state.read_count += 1;
-                    }
-                    if first_reader {
+                    let state = &mut *self.state.get();
+                    let old_count = state.read_count.fetch_add(1, atomics::Acquire);
+                    if old_count == 0 {
                         (&self.access_lock).acquire();
-                        do self.state.with |state| {
-                            // Must happen *after* getting access_lock. If
-                            // this is set while readers are waiting, but
-                            // while a writer holds the lock, the writer will
-                            // be confused if they downgrade-then-unlock.
-                            state.read_mode = true;
-                        }
+                        state.read_mode = true;
                     }
                 }
                 release = Some(RWlockReleaseRead(self));
@@ -606,12 +622,12 @@ impl RWlock {
      * # Example
      *
      * ~~~ {.rust}
-     * do lock.write_downgrade |write_mode| {
-     *     do (&write_mode).write_cond |condvar| {
+     * do lock.write_downgrade |write_token| {
+     *     do (&write_token).write_cond |condvar| {
      *         ... exclusive access ...
      *     }
-     *     let read_mode = lock.downgrade(write_mode);
-     *     do (&read_mode).read {
+     *     let read_token = lock.downgrade(write_token);
+     *     do (&read_token).read {
      *         ... shared access ...
      *     }
      * }
@@ -640,14 +656,15 @@ impl RWlock {
         }
         unsafe {
             do task::unkillable {
-                let mut first_reader = false;
-                do self.state.with |state| {
-                    assert!(!state.read_mode);
-                    state.read_mode = true;
-                    first_reader = (state.read_count == 0);
-                    state.read_count += 1;
-                }
-                if !first_reader {
+                let state = &mut *self.state.get();
+                assert!(!state.read_mode);
+                state.read_mode = true;
+                // If a reader attempts to enter at this point, both the
+                // downgrader and reader will set the mode flag. This is fine.
+                let old_count = state.read_count.fetch_add(1, atomics::Release);
+                // If another reader was already blocking, we need to hand-off
+                // the "reader cloud" access lock to them.
+                if old_count != 0 {
                     // Guaranteed not to let another writer in, because
                     // another reader was holding the order_lock. Hence they
                     // must be the one to get the access_lock (because all
@@ -667,22 +684,22 @@ struct RWlockReleaseRead<'self> {
     lock: &'self RWlock,
 }
 
+#[doc(hidden)]
 #[unsafe_destructor]
 impl<'self> Drop for RWlockReleaseRead<'self> {
     fn finalize(&self) {
         unsafe {
             do task::unkillable {
-                let mut last_reader = false;
-                do self.lock.state.with |state| {
-                    assert!(state.read_mode);
-                    assert!(state.read_count > 0);
-                    state.read_count -= 1;
-                    if state.read_count == 0 {
-                        last_reader = true;
-                        state.read_mode = false;
-                    }
-                }
-                if last_reader {
+                let state = &mut *self.lock.state.get();
+                assert!(state.read_mode);
+                let old_count = state.read_count.fetch_sub(1, atomics::Release);
+                assert!(old_count > 0);
+                if old_count == 1 {
+                    state.read_mode = false;
+                    // Note: this release used to be outside of a locked access
+                    // to exclusive-protected state. If this code is ever
+                    // converted back to such (instead of using atomic ops),
+                    // this access MUST NOT go inside the exclusive access.
                     (&self.lock.access_lock).release();
                 }
             }
@@ -690,6 +707,7 @@ impl<'self> Drop for RWlockReleaseRead<'self> {
     }
 }
 
+#[doc(hidden)]
 fn RWlockReleaseRead<'r>(lock: &'r RWlock) -> RWlockReleaseRead<'r> {
     RWlockReleaseRead {
         lock: lock
@@ -703,30 +721,34 @@ struct RWlockReleaseDowngrade<'self> {
     lock: &'self RWlock,
 }
 
+#[doc(hidden)]
 #[unsafe_destructor]
 impl<'self> Drop for RWlockReleaseDowngrade<'self> {
     fn finalize(&self) {
         unsafe {
             do task::unkillable {
-                let mut writer_or_last_reader = false;
-                do self.lock.state.with |state| {
-                    if state.read_mode {
-                        assert!(state.read_count > 0);
-                        state.read_count -= 1;
-                        if state.read_count == 0 {
-                            // Case 1: Writer downgraded & was the last reader
-                            writer_or_last_reader = true;
-                            state.read_mode = false;
-                        } else {
-                            // Case 2: Writer downgraded & was not the last
-                            // reader
-                        }
-                    } else {
-                        // Case 3: Writer did not downgrade
+                let writer_or_last_reader;
+                // Check if we're releasing from read mode or from write mode.
+                let state = &mut *self.lock.state.get();
+                if state.read_mode {
+                    // Releasing from read mode.
+                    let old_count = state.read_count.fetch_sub(1, atomics::Release);
+                    assert!(old_count > 0);
+                    // Check if other readers remain.
+                    if old_count == 1 {
+                        // Case 1: Writer downgraded & was the last reader
                         writer_or_last_reader = true;
+                        state.read_mode = false;
+                    } else {
+                        // Case 2: Writer downgraded & was not the last reader
+                        writer_or_last_reader = false;
                     }
+                } else {
+                    // Case 3: Writer did not downgrade
+                    writer_or_last_reader = true;
                 }
                 if writer_or_last_reader {
+                    // Nobody left inside; release the "reader cloud" lock.
                     (&self.lock.access_lock).release();
                 }
             }
@@ -734,6 +756,7 @@ impl<'self> Drop for RWlockReleaseDowngrade<'self> {
     }
 }
 
+#[doc(hidden)]
 fn RWlockReleaseDowngrade<'r>(lock: &'r RWlock)
                            -> RWlockReleaseDowngrade<'r> {
     RWlockReleaseDowngrade {