diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-11-24 11:16:40 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-12-05 09:12:25 -0800 |
| commit | c3adbd34c4e637d20a184eb03f09b30c69de8b6e (patch) | |
| tree | 7be3d3a9b5bf062fcffc8aa0b9e0de8267ab41c9 /src/libstd/comm | |
| parent | 71d4e77db8ad4b6d821da7e5d5300134ac95974e (diff) | |
| download | rust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.tar.gz rust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.zip | |
Fall out of the std::sync rewrite
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/mod.rs | 20 | ||||
| -rw-r--r-- | src/libstd/comm/shared.rs | 21 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 2 |
3 files changed, 24 insertions, 19 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 2b66e91c00d..d291ed72567 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -354,6 +354,8 @@ mod select; mod shared; mod stream; mod sync; +mod mpsc_queue; +mod spsc_queue; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task @@ -628,24 +630,26 @@ impl<T: Send> Sender<T> { #[unstable] impl<T: Send> Clone for Sender<T> { fn clone(&self) -> Sender<T> { - let (packet, sleeper) = match *unsafe { self.inner() } { + let (packet, sleeper, guard) = match *unsafe { self.inner() } { Oneshot(ref p) => { let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { - (*a.get()).postinit_lock(); + let guard = (*a.get()).postinit_lock(); match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), - oneshot::UpWoke(task) => (a, Some(task)) + oneshot::UpSuccess | + oneshot::UpDisconnected => (a, None, guard), + oneshot::UpWoke(task) => (a, Some(task), guard) } } } Stream(ref p) => { let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { - (*a.get()).postinit_lock(); + let guard = (*a.get()).postinit_lock(); match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - stream::UpSuccess | stream::UpDisconnected => (a, None), - stream::UpWoke(task) => (a, Some(task)), + stream::UpSuccess | + stream::UpDisconnected => (a, None, guard), + stream::UpWoke(task) => (a, Some(task), guard), } } } @@ -657,7 +661,7 @@ impl<T: Send> Clone for Sender<T> { }; unsafe { - (*packet.get()).inherit_blocker(sleeper); + (*packet.get()).inherit_blocker(sleeper, guard); let tmp = Sender::new(Shared(packet.clone())); mem::swap(self.inner_mut(), tmp.inner_mut()); diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 6396edbdbd1..13b5e10fcd3 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -26,12 +26,11 @@ use alloc::boxed::Box; use core::cmp; use core::int; use rustrt::local::Local; -use rustrt::mutex::NativeMutex; use rustrt::task::{Task, BlockedTask}; use rustrt::thread::Thread; -use sync::atomic; -use sync::mpsc_queue as mpsc; +use sync::{atomic, Mutex, MutexGuard}; +use comm::mpsc_queue as mpsc; const DISCONNECTED: int = int::MIN; const FUDGE: int = 1024; @@ -56,7 +55,7 @@ pub struct Packet<T> { // this lock protects various portions of this implementation during // select() - select_lock: NativeMutex, + select_lock: Mutex<()>, } pub enum Failure { @@ -76,7 +75,7 @@ impl<T: Send> Packet<T> { channels: atomic::AtomicInt::new(2), port_dropped: atomic::AtomicBool::new(false), sender_drain: atomic::AtomicInt::new(0), - select_lock: unsafe { NativeMutex::new() }, + select_lock: Mutex::new(()), }; return p; } @@ -86,8 +85,8 @@ impl<T: Send> Packet<T> { // In other case mutex data will be duplicated while cloning // and that could cause problems on platforms where it is // represented by opaque data structure - pub fn postinit_lock(&mut self) { - unsafe { self.select_lock.lock_noguard() } + pub fn postinit_lock(&self) -> MutexGuard<()> { + self.select_lock.lock() } // This function is used at the creation of a shared packet to inherit a @@ -95,7 +94,9 @@ impl<T: Send> Packet<T> { // tasks in select(). // // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) { + pub fn inherit_blocker(&mut self, + task: Option<BlockedTask>, + guard: MutexGuard<()>) { match task { Some(task) => { assert_eq!(self.cnt.load(atomic::SeqCst), 0); @@ -135,7 +136,7 @@ impl<T: Send> Packet<T> { // interfere with this method. After we unlock this lock, we're // signifying that we're done modifying self.cnt and self.to_wake and // the port is ready for the world to continue using it. - unsafe { self.select_lock.unlock_noguard() } + drop(guard); } pub fn send(&mut self, t: T) -> Result<(), T> { @@ -441,7 +442,7 @@ impl<T: Send> Packet<T> { // done with. Without this bounce, we can race with inherit_blocker // about looking at and dealing with to_wake. Once we have acquired the // lock, we are guaranteed that inherit_blocker is done. - unsafe { + { let _guard = self.select_lock.lock(); } diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 23d042960b1..06ab4f4427a 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -32,7 +32,7 @@ use rustrt::task::{Task, BlockedTask}; use rustrt::thread::Thread; use sync::atomic; -use sync::spsc_queue as spsc; +use comm::spsc_queue as spsc; use comm::Receiver; const DISCONNECTED: int = int::MIN; |
