about summary refs log tree commit diff
path: root/src/libstd/sync.rs
diff options
context:
space:
mode:
authorBen Blum <bblum@andrew.cmu.edu>2012-08-13 15:22:32 -0400
committerBen Blum <bblum@andrew.cmu.edu>2012-08-13 17:31:24 -0400
commit7cf21e52ebd8ac2b3ce6d0aac8292bf55c39477e (patch)
tree9ce4980bbc4cb8cb5a21d1cfdbc4a290a26b404c /src/libstd/sync.rs
parent6c4843d9da70aeb402fc788e07e7912ecbee3559 (diff)
downloadrust-7cf21e52ebd8ac2b3ce6d0aac8292bf55c39477e.tar.gz
rust-7cf21e52ebd8ac2b3ce6d0aac8292bf55c39477e.zip
Implement rwlock.downgrade and tests
Diffstat (limited to 'src/libstd/sync.rs')
-rw-r--r--src/libstd/sync.rs238
1 files changed, 202 insertions, 36 deletions
diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs
index 3ca0d43d985..2650c1f734d 100644
--- a/src/libstd/sync.rs
+++ b/src/libstd/sync.rs
@@ -142,9 +142,9 @@ struct sem_and_signal_release {
 }
 
 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
-struct condvar { priv sem: &sem<waitqueue>; }
+struct condvar { priv sem: &sem<waitqueue>; drop { } }
 
-impl condvar {
+impl &condvar {
     /// Atomically drop the associated lock, and block until a signal is sent.
     fn wait() {
         // Create waiter nobe.
@@ -212,8 +212,9 @@ impl condvar {
 }
 
 impl &sem<waitqueue> {
-    fn access_cond<U>(blk: fn(condvar) -> U) -> U {
-        do self.access { blk(condvar { sem: self }) }
+    // The only other place that condvars get built is rwlock_write_mode.
+    fn access_cond<U>(blk: fn(c: &condvar) -> U) -> U {
+        do self.access { blk(&condvar { sem: self }) }
     }
 }
 
@@ -272,7 +273,7 @@ impl &mutex {
     fn lock<U>(blk: fn() -> U) -> U { (&self.sem).access(blk) }
 
     /// Run a function with ownership of the mutex and a handle to a condvar.
-    fn lock_cond<U>(blk: fn(condvar) -> U) -> U {
+    fn lock_cond<U>(blk: fn(c: &condvar) -> U) -> U {
         (&self.sem).access_cond(blk)
     }
 }
@@ -321,12 +322,18 @@ impl &rwlock {
                 do (&self.order_lock).access {
                     let mut first_reader = false;
                     do self.state.with |state| {
-                        state.read_mode = true;
                         first_reader = (state.read_count == 0);
                         state.read_count += 1;
                     }
                     if first_reader {
                         (&self.access_lock).acquire();
+                        do self.state.with |state| {
+                            // Must happen *after* getting access_lock. If
+                            // this is set while readers are waiting, but
+                            // while a writer holds the lock, the writer will
+                            // be confused if they downgrade-then-unlock.
+                            state.read_mode = true;
+                        }
                     }
                 }
                 release = some(rwlock_release_read(self));
@@ -357,7 +364,7 @@ impl &rwlock {
      * the waiting task is signalled. (Note: a writer that waited and then
      * was signalled might reacquire the lock before other waiting writers.)
      */
-    fn write_cond<U>(blk: fn(condvar) -> U) -> U {
+    fn write_cond<U>(blk: fn(c: &condvar) -> U) -> U {
         // NB: You might think I should thread the order_lock into the cond
         // wait call, so that it gets waited on before access_lock gets
         // reacquired upon being woken up. However, (a) this would be not
@@ -374,7 +381,62 @@ impl &rwlock {
         }
     }
 
-    // to-do implement downgrade
+    /**
+     * As write(), but with the ability to atomically 'downgrade' the lock;
+     * i.e., to become a reader without letting other writers get the lock in
+     * the meantime (such as unlocking and then re-locking as a reader would
+     * do). The block takes a "write mode token" argument, which can be
+     * transformed into a "read mode token" by calling downgrade(). Example:
+     *
+     *     do lock.write_downgrade |write_mode| {
+     *         do (&write_mode).write_cond |condvar| {
+     *             ... exclusive access ...
+     *         }
+     *         let read_mode = lock.downgrade(write_mode);
+     *         do (&read_mode).read {
+     *             ... shared access ...
+     *         }
+     *     }
+     */
+    fn write_downgrade<U>(blk: fn(+rwlock_write_mode) -> U) -> U {
+        // Implementation slightly different from the slicker 'write's above.
+        // The exit path is conditional on whether the caller downgrades.
+        let mut _release = none;
+        unsafe {
+            do task::unkillable {
+                (&self.order_lock).acquire();
+                (&self.access_lock).acquire();
+                (&self.order_lock).release();
+            }
+            _release = some(rwlock_release_downgrade(self));
+        }
+        blk(rwlock_write_mode { lock: self })
+    }
+
+    fn downgrade(+token: rwlock_write_mode) -> rwlock_read_mode {
+        if !ptr::ref_eq(self, token.lock) {
+            fail ~"Can't downgrade() with a different rwlock's write_mode!";
+        }
+        unsafe {
+            do task::unkillable {
+                let mut first_reader = false;
+                do self.state.with |state| {
+                    assert !state.read_mode;
+                    state.read_mode = true;
+                    first_reader = (state.read_count == 0);
+                    state.read_count += 1;
+                }
+                if !first_reader {
+                    // Guaranteed not to let another writer in, because
+                    // another reader was holding the order_lock. Hence they
+                    // must be the one to get the access_lock (because all
+                    // access_locks are acquired with order_lock held).
+                    (&self.access_lock).release();
+                }
+            }
+        }
+        rwlock_read_mode { lock: token.lock }
+    }
 }
 
 // FIXME(#3136) should go inside of read()
@@ -386,8 +448,12 @@ struct rwlock_release_read {
             let mut last_reader = false;
             do self.lock.state.with |state| {
                 assert state.read_mode;
+                assert state.read_count > 0;
                 state.read_count -= 1;
-                last_reader = (state.read_count == 0);
+                if state.read_count == 0 {
+                    last_reader = true;
+                    state.read_mode = false;
+                }
             }
             if last_reader {
                 (&self.lock.access_lock).release();
@@ -396,6 +462,56 @@ struct rwlock_release_read {
     }
 }
 
+// FIXME(#3136) should go inside of downgrade()
+struct rwlock_release_downgrade {
+    lock: &rwlock;
+    new(lock: &rwlock) { self.lock = lock; }
+    drop unsafe {
+        do task::unkillable {
+            let mut writer_or_last_reader = false;
+            do self.lock.state.with |state| {
+                if state.read_mode {
+                    assert state.read_count > 0;
+                    state.read_count -= 1;
+                    if state.read_count == 0 {
+                        // Case 1: Writer downgraded & was the last reader
+                        writer_or_last_reader = true;
+                        state.read_mode = false;
+                    } else {
+                        // Case 2: Writer downgraded & was not the last reader
+                    }
+                } else {
+                    // Case 3: Writer did not downgrade
+                    writer_or_last_reader = true;
+                }
+            }
+            if writer_or_last_reader {
+                (&self.lock.access_lock).release();
+            }
+        }
+    }
+}
+
+/// The "write permission" token used for rwlock.write_downgrade().
+// FIXME(#3145): make lock priv somehow
+struct rwlock_write_mode { lock: &rwlock; drop { } }
+/// The "read permission" token used for rwlock.write_downgrade().
+struct rwlock_read_mode  { priv lock: &rwlock; drop { } }
+
+// FIXME(#3145) XXX Region invariance forbids "mode.write(blk)"
+impl rwlock_write_mode {
+    /// Access the pre-downgrade rwlock in write mode.
+    fn write<U>(blk: fn() -> U) -> U { blk() }
+    /// Access the pre-downgrade rwlock in write mode with a condvar.
+    fn write_cond<U>(blk: fn(c: &condvar) -> U) -> U {
+        blk(&condvar { sem: &self.lock.access_lock })
+    }
+}
+impl rwlock_read_mode {
+    /// Access the post-downgrade rwlock in read mode.
+    fn read<U>(blk: fn() -> U) -> U { blk() }
+}
+
 /****************************************************************************
  * Tests
  ****************************************************************************/
@@ -510,9 +626,11 @@ mod tests {
         let sharedstate = ~0;
         let ptr = ptr::addr_of(*sharedstate);
         do task::spawn {
-            let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
+            let sharedstate: &mut int =
+                unsafe { unsafe::reinterpret_cast(ptr) };
             access_shared(sharedstate, m2, 10);
             c.send(());
+
         }
         access_shared(sharedstate, m, 10);
         let _ = p.recv();
@@ -645,21 +763,27 @@ mod tests {
         // child task must have finished by the time try returns
         do m.lock_cond |cond| {
             let _woken = cond.signal();
-            // FIXME(#3145) - The semantics of pipes are not quite what I want
-            // here - the pipe doesn't get 'terminated' if the child was
-            // punted awake during failure.
-            // assert !woken;
+            // FIXME(#3145) this doesn't work
+            //assert !woken;
         }
     }
     /************************************************************************
      * Reader/writer lock tests
      ************************************************************************/
     #[cfg(test)]
-    fn lock_rwlock_in_mode(x: &rwlock, reader: bool, blk: fn()) {
-        if reader { x.read(blk); } else { x.write(blk); }
+    enum rwlock_mode { read, write, downgrade, downgrade_read }
+    #[cfg(test)]
+    fn lock_rwlock_in_mode(x: &rwlock, mode: rwlock_mode, blk: fn()) {
+        match mode {
+            read => x.read(blk),
+            write => x.write(blk),
+            downgrade => do x.write_downgrade |mode| { mode.write(blk); },
+            downgrade_read =>
+                do x.write_downgrade |mode| { x.downgrade(mode).read(blk); },
+        }
     }
     #[cfg(test)]
-    fn test_rwlock_exclusion(reader1: bool, reader2: bool) {
+    fn test_rwlock_exclusion(mode1: rwlock_mode, mode2: rwlock_mode) {
         // Test mutual exclusion between readers and writers. Just like the
         // mutex mutual exclusion test, a ways above.
         let (c,p) = pipes::stream();
@@ -668,19 +792,20 @@ mod tests {
         let sharedstate = ~0;
         let ptr = ptr::addr_of(*sharedstate);
         do task::spawn {
-            let sharedstate = unsafe { unsafe::reinterpret_cast(ptr) };
-            access_shared(sharedstate, x2, reader1, 10);
+            let sharedstate: &mut int =
+                unsafe { unsafe::reinterpret_cast(ptr) };
+            access_shared(sharedstate, x2, mode1, 10);
             c.send(());
         }
-        access_shared(sharedstate, x, reader2, 10);
+        access_shared(sharedstate, x, mode2, 10);
         let _ = p.recv();
 
         assert *sharedstate == 20;
 
-        fn access_shared(sharedstate: &mut int, x: &rwlock, reader: bool,
+        fn access_shared(sharedstate: &mut int, x: &rwlock, mode: rwlock_mode,
                          n: uint) {
             for n.times {
-                do lock_rwlock_in_mode(x, reader) {
+                do lock_rwlock_in_mode(x, mode) {
                     let oldval = *sharedstate;
                     task::yield();
                     *sharedstate = oldval + 1;
@@ -690,32 +815,59 @@ mod tests {
     }
     #[test]
     fn test_rwlock_readers_wont_modify_the_data() {
-        test_rwlock_exclusion(true, false);
-        test_rwlock_exclusion(false, true);
+        test_rwlock_exclusion(read, write);
+        test_rwlock_exclusion(write, read);
+        test_rwlock_exclusion(read, downgrade);
+        test_rwlock_exclusion(downgrade, read);
     }
     #[test]
     fn test_rwlock_writers_and_writers() {
-        test_rwlock_exclusion(false, false);
+        test_rwlock_exclusion(write, write);
+        test_rwlock_exclusion(write, downgrade);
+        test_rwlock_exclusion(downgrade, write);
+        test_rwlock_exclusion(downgrade, downgrade);
     }
-    #[test]
-    fn test_rwlock_readers_and_readers() {
+    #[cfg(test)]
+    fn test_rwlock_handshake(mode1: rwlock_mode, mode2: rwlock_mode,
+                             make_mode2_go_first: bool) {
         // Much like sem_multi_resource.
         let x = ~rwlock();
         let x2 = ~x.clone();
         let (c1,p1) = pipes::stream();
         let (c2,p2) = pipes::stream();
         do task::spawn {
-            do x2.read {
+            if !make_mode2_go_first {
+                let _ = p2.recv(); // parent sends to us once it locks, or ...
+            }
+            do lock_rwlock_in_mode(x2, mode2) {
+                if make_mode2_go_first {
+                    c1.send(()); // ... we send to it once we lock
+                }
                 let _ = p2.recv();
                 c1.send(());
             }
         }
-        do x.read {
+        if make_mode2_go_first {
+            let _ = p1.recv(); // child sends to us once it locks, or ...
+        }
+        do lock_rwlock_in_mode(x, mode1) {
+            if !make_mode2_go_first {
+                c2.send(()); // ... we send to it once we lock
+            }
             c2.send(());
             let _ = p1.recv();
         }
     }
     #[test]
+    fn test_rwlock_readers_and_readers() {
+        test_rwlock_handshake(read, read, false);
+        // The downgrader needs to get in before the reader gets in, otherwise
+        // they cannot end up reading at the same time.
+        test_rwlock_handshake(downgrade_read, read, false);
+        test_rwlock_handshake(read, downgrade_read, true);
+        // Two downgrade_reads can never both end up reading at the same time.
+    }
+    #[test]
     fn test_rwlock_cond_wait() {
         // As test_mutex_cond_wait above.
         let x = ~rwlock();
@@ -751,26 +903,40 @@ mod tests {
         do x.read { } // Just for good measure
     }
     #[cfg(test)] #[ignore(cfg(windows))]
-    fn rwlock_kill_helper(reader1: bool, reader2: bool) {
+    fn rwlock_kill_helper(mode1: rwlock_mode, mode2: rwlock_mode) {
         // Mutex must get automatically unlocked if failed/killed within.
         let x = ~rwlock();
         let x2 = ~x.clone();
 
         let result: result::result<(),()> = do task::try {
-            do lock_rwlock_in_mode(x2, reader1) {
+            do lock_rwlock_in_mode(x2, mode1) {
                 fail;
             }
         };
         assert result.is_err();
         // child task must have finished by the time try returns
-        do lock_rwlock_in_mode(x, reader2) { }
+        do lock_rwlock_in_mode(x, mode2) { }
     }
     #[test] #[ignore(cfg(windows))]
-    fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(true, false); }
+    fn test_rwlock_reader_killed_writer() { rwlock_kill_helper(read, write); }
     #[test] #[ignore(cfg(windows))]
-    fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(false,true ); }
+    fn test_rwlock_writer_killed_reader() { rwlock_kill_helper(write,read ); }
     #[test] #[ignore(cfg(windows))]
-    fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(true, true ); }
+    fn test_rwlock_reader_killed_reader() { rwlock_kill_helper(read, read ); }
     #[test] #[ignore(cfg(windows))]
-    fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(false,false); }
+    fn test_rwlock_writer_killed_writer() { rwlock_kill_helper(write,write); }
+    #[test] #[should_fail] #[ignore(cfg(windows))]
+    fn test_rwlock_downgrade_cant_swap() {
+        // Tests that you can't downgrade with a different rwlock's token.
+        let x = ~rwlock();
+        let y = ~rwlock();
+        do x.write_downgrade |xwrite| {
+            let mut xopt = some(xwrite);
+            do y.write_downgrade |_ywrite| {
+                do y.downgrade(option::swap_unwrap(&mut xopt)).read {
+                    error!("oops, y.downgrade(x) should have failed!");
+                }
+            }
+        }
+    }
 }