about summary refs log tree commit diff
path: root/src/libstd/comm
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-11-24 11:16:40 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-12-05 09:12:25 -0800
commitc3adbd34c4e637d20a184eb03f09b30c69de8b6e (patch)
tree7be3d3a9b5bf062fcffc8aa0b9e0de8267ab41c9 /src/libstd/comm
parent71d4e77db8ad4b6d821da7e5d5300134ac95974e (diff)
downloadrust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.tar.gz
rust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.zip
Fall out of the std::sync rewrite
Diffstat (limited to 'src/libstd/comm')
-rw-r--r--src/libstd/comm/mod.rs20
-rw-r--r--src/libstd/comm/shared.rs21
-rw-r--r--src/libstd/comm/stream.rs2
3 files changed, 24 insertions, 19 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index 2b66e91c00d..d291ed72567 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -354,6 +354,8 @@ mod select;
 mod shared;
 mod stream;
 mod sync;
+mod mpsc_queue;
+mod spsc_queue;
 
 /// The receiving-half of Rust's channel type. This half can only be owned by
 /// one task
@@ -628,24 +630,26 @@ impl<T: Send> Sender<T> {
 #[unstable]
 impl<T: Send> Clone for Sender<T> {
     fn clone(&self) -> Sender<T> {
-        let (packet, sleeper) = match *unsafe { self.inner() } {
+        let (packet, sleeper, guard) = match *unsafe { self.inner() } {
             Oneshot(ref p) => {
                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
                 unsafe {
-                    (*a.get()).postinit_lock();
+                    let guard = (*a.get()).postinit_lock();
                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
-                        oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
-                        oneshot::UpWoke(task) => (a, Some(task))
+                        oneshot::UpSuccess |
+                        oneshot::UpDisconnected => (a, None, guard),
+                        oneshot::UpWoke(task) => (a, Some(task), guard)
                     }
                 }
             }
             Stream(ref p) => {
                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
                 unsafe {
-                    (*a.get()).postinit_lock();
+                    let guard = (*a.get()).postinit_lock();
                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
-                        stream::UpSuccess | stream::UpDisconnected => (a, None),
-                        stream::UpWoke(task) => (a, Some(task)),
+                        stream::UpSuccess |
+                        stream::UpDisconnected => (a, None, guard),
+                        stream::UpWoke(task) => (a, Some(task), guard),
                     }
                 }
             }
@@ -657,7 +661,7 @@ impl<T: Send> Clone for Sender<T> {
         };
 
         unsafe {
-            (*packet.get()).inherit_blocker(sleeper);
+            (*packet.get()).inherit_blocker(sleeper, guard);
 
             let tmp = Sender::new(Shared(packet.clone()));
             mem::swap(self.inner_mut(), tmp.inner_mut());
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
index 6396edbdbd1..13b5e10fcd3 100644
--- a/src/libstd/comm/shared.rs
+++ b/src/libstd/comm/shared.rs
@@ -26,12 +26,11 @@ use alloc::boxed::Box;
 use core::cmp;
 use core::int;
 use rustrt::local::Local;
-use rustrt::mutex::NativeMutex;
 use rustrt::task::{Task, BlockedTask};
 use rustrt::thread::Thread;
 
-use sync::atomic;
-use sync::mpsc_queue as mpsc;
+use sync::{atomic, Mutex, MutexGuard};
+use comm::mpsc_queue as mpsc;
 
 const DISCONNECTED: int = int::MIN;
 const FUDGE: int = 1024;
@@ -56,7 +55,7 @@ pub struct Packet<T> {
 
     // this lock protects various portions of this implementation during
     // select()
-    select_lock: NativeMutex,
+    select_lock: Mutex<()>,
 }
 
 pub enum Failure {
@@ -76,7 +75,7 @@ impl<T: Send> Packet<T> {
             channels: atomic::AtomicInt::new(2),
             port_dropped: atomic::AtomicBool::new(false),
             sender_drain: atomic::AtomicInt::new(0),
-            select_lock: unsafe { NativeMutex::new() },
+            select_lock: Mutex::new(()),
         };
         return p;
     }
@@ -86,8 +85,8 @@ impl<T: Send> Packet<T> {
     // In other case mutex data will be duplicated while cloning
     // and that could cause problems on platforms where it is
     // represented by opaque data structure
-    pub fn postinit_lock(&mut self) {
-        unsafe { self.select_lock.lock_noguard() }
+    pub fn postinit_lock(&self) -> MutexGuard<()> {
+        self.select_lock.lock()
     }
 
     // This function is used at the creation of a shared packet to inherit a
@@ -95,7 +94,9 @@ impl<T: Send> Packet<T> {
     // tasks in select().
     //
     // This can only be called at channel-creation time
-    pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+    pub fn inherit_blocker(&mut self,
+                           task: Option<BlockedTask>,
+                           guard: MutexGuard<()>) {
         match task {
             Some(task) => {
                 assert_eq!(self.cnt.load(atomic::SeqCst), 0);
@@ -135,7 +136,7 @@ impl<T: Send> Packet<T> {
         // interfere with this method. After we unlock this lock, we're
         // signifying that we're done modifying self.cnt and self.to_wake and
         // the port is ready for the world to continue using it.
-        unsafe { self.select_lock.unlock_noguard() }
+        drop(guard);
     }
 
     pub fn send(&mut self, t: T) -> Result<(), T> {
@@ -441,7 +442,7 @@ impl<T: Send> Packet<T> {
         // done with. Without this bounce, we can race with inherit_blocker
         // about looking at and dealing with to_wake. Once we have acquired the
         // lock, we are guaranteed that inherit_blocker is done.
-        unsafe {
+        {
             let _guard = self.select_lock.lock();
         }
 
diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs
index 23d042960b1..06ab4f4427a 100644
--- a/src/libstd/comm/stream.rs
+++ b/src/libstd/comm/stream.rs
@@ -32,7 +32,7 @@ use rustrt::task::{Task, BlockedTask};
 use rustrt::thread::Thread;
 
 use sync::atomic;
-use sync::spsc_queue as spsc;
+use comm::spsc_queue as spsc;
 use comm::Receiver;
 
 const DISCONNECTED: int = int::MIN;