diff options
| author | Brian Anderson <banderson@mozilla.com> | 2015-01-24 09:15:42 -0800 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2015-01-25 01:20:55 -0800 |
| commit | 63fcbcf3ce8f0ca391c18b2d61833ae6beb3ac70 (patch) | |
| tree | c732033c0822f25f2aebcdf193de1b257bac1855 /src/libstd/sync/mpsc | |
| parent | b44ee371b8beea77aa1364460acbba14a8516559 (diff) | |
| parent | 0430a43d635841db44978bb648e9cf7e7cfa1bba (diff) | |
| download | rust-63fcbcf3ce8f0ca391c18b2d61833ae6beb3ac70.tar.gz rust-63fcbcf3ce8f0ca391c18b2d61833ae6beb3ac70.zip | |
Merge remote-tracking branch 'rust-lang/master'
Conflicts: mk/tests.mk src/liballoc/arc.rs src/liballoc/boxed.rs src/liballoc/rc.rs src/libcollections/bit.rs src/libcollections/btree/map.rs src/libcollections/btree/set.rs src/libcollections/dlist.rs src/libcollections/ring_buf.rs src/libcollections/slice.rs src/libcollections/str.rs src/libcollections/string.rs src/libcollections/vec.rs src/libcollections/vec_map.rs src/libcore/any.rs src/libcore/array.rs src/libcore/borrow.rs src/libcore/error.rs src/libcore/fmt/mod.rs src/libcore/iter.rs src/libcore/marker.rs src/libcore/ops.rs src/libcore/result.rs src/libcore/slice.rs src/libcore/str/mod.rs src/libregex/lib.rs src/libregex/re.rs src/librustc/lint/builtin.rs src/libstd/collections/hash/map.rs src/libstd/collections/hash/set.rs src/libstd/sync/mpsc/mod.rs src/libstd/sync/mutex.rs src/libstd/sync/poison.rs src/libstd/sync/rwlock.rs src/libsyntax/feature_gate.rs src/libsyntax/test.rs
Diffstat (limited to 'src/libstd/sync/mpsc')
| -rw-r--r-- | src/libstd/sync/mpsc/blocking.rs | 30 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 83 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mpsc_queue.rs | 3 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/select.rs | 41 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/spsc_queue.rs | 5 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/sync.rs | 17 |
6 files changed, 46 insertions, 133 deletions
diff --git a/src/libstd/sync/mpsc/blocking.rs b/src/libstd/sync/mpsc/blocking.rs index 17e690e9540..61ffb532d36 100644 --- a/src/libstd/sync/mpsc/blocking.rs +++ b/src/libstd/sync/mpsc/blocking.rs @@ -14,8 +14,6 @@ use thread::Thread; use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering}; use sync::Arc; use marker::{Sync, Send}; -#[cfg(stage0)] // NOTE remove use after next snapshot -use marker::{NoSend, NoSync}; use mem; use clone::Clone; @@ -32,42 +30,14 @@ pub struct SignalToken { inner: Arc<Inner>, } -#[cfg(stage0)] // NOTE remove impl after next snapshot pub struct WaitToken { inner: Arc<Inner>, - no_send: NoSend, - no_sync: NoSync, } -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot -pub struct WaitToken { - inner: Arc<Inner>, -} - -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot impl !Send for WaitToken {} -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot impl !Sync for WaitToken {} -#[cfg(stage0)] // NOTE remove impl after next snapshot -pub fn tokens() -> (WaitToken, SignalToken) { - let inner = Arc::new(Inner { - thread: Thread::current(), - woken: ATOMIC_BOOL_INIT, - }); - let wait_token = WaitToken { - inner: inner.clone(), - no_send: NoSend, - no_sync: NoSync, - }; - let signal_token = SignalToken { - inner: inner - }; - (wait_token, signal_token) -} - -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot pub fn tokens() -> (WaitToken, SignalToken) { let inner = Arc::new(Inner { thread: Thread::current(), diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 8fce8cbabcc..101af8d5e9a 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -319,7 +319,6 @@ use prelude::v1::*; use sync::Arc; use fmt; -use marker; use mem; use cell::UnsafeCell; @@ -369,31 +368,21 @@ unsafe impl<T:Send> Send for Sender<T> { } /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. -#[cfg(stage0)] // NOTE remove impl after next snapshot #[stable(feature = "rust1", since = "1.0.0")] pub struct SyncSender<T> { - inner: Arc<RacyCell<sync::Packet<T>>>, - // can't share in an arc - _marker: marker::NoSync, + inner: Arc<UnsafeCell<sync::Packet<T>>>, } -/// The sending-half of Rust's synchronous channel type. This half can only be -/// owned by one task, but it can be cloned to send to other tasks. -#[stable(feature = "rust1", since = "1.0.0")] -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot -pub struct SyncSender<T> { - inner: Arc<RacyCell<sync::Packet<T>>>, -} +unsafe impl<T:Send> Send for SyncSender<T> {} -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot -impl<T> !marker::Sync for SyncSender<T> {} +impl<T> !Sync for SyncSender<T> {} /// An error returned from the `send` function on channels. /// /// A `send` operation can only fail if the receiving end of a channel is /// disconnected, implying that the data could never be received. The error /// contains the data being sent as a payload so it can be recovered. -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Show)] #[stable(feature = "rust1", since = "1.0.0")] pub struct SendError<T>(pub T); @@ -401,13 +390,13 @@ pub struct SendError<T>(pub T); /// /// The `recv` operation can only fail if the sending half of a channel is /// disconnected, implying that no further messages will ever be received. -#[derive(PartialEq, Eq, Clone, Copy)] +#[derive(PartialEq, Eq, Clone, Copy, Show)] #[stable(feature = "rust1", since = "1.0.0")] pub struct RecvError; /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. -#[derive(PartialEq, Clone, Copy)] +#[derive(PartialEq, Clone, Copy, Show)] #[stable(feature = "rust1", since = "1.0.0")] pub enum TryRecvError { /// This channel is currently empty, but the sender(s) have not yet @@ -423,7 +412,7 @@ pub enum TryRecvError { /// This enumeration is the list of the possible error outcomes for the /// `SyncSender::try_send` method. -#[derive(PartialEq, Clone)] +#[derive(PartialEq, Clone, Show)] #[stable(feature = "rust1", since = "1.0.0")] pub enum TrySendError<T> { /// The data could not be sent on the channel because it would require that @@ -442,10 +431,10 @@ pub enum TrySendError<T> { } enum Flavor<T> { - Oneshot(Arc<RacyCell<oneshot::Packet<T>>>), - Stream(Arc<RacyCell<stream::Packet<T>>>), - Shared(Arc<RacyCell<shared::Packet<T>>>), - Sync(Arc<RacyCell<sync::Packet<T>>>), + Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>), + Stream(Arc<UnsafeCell<stream::Packet<T>>>), + Shared(Arc<UnsafeCell<shared::Packet<T>>>), + Sync(Arc<UnsafeCell<sync::Packet<T>>>), } #[doc(hidden)] @@ -497,7 +486,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { - let a = Arc::new(RacyCell::new(oneshot::Packet::new())); + let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) } @@ -537,7 +526,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { - let a = Arc::new(RacyCell::new(sync::Packet::new(bound))); + let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) } @@ -589,7 +578,7 @@ impl<T: Send> Sender<T> { return (*p).send(t).map_err(SendError); } else { let a = - Arc::new(RacyCell::new(stream::Packet::new())); + Arc::new(UnsafeCell::new(stream::Packet::new())); let rx = Receiver::new(Flavor::Stream(a.clone())); match (*p).upgrade(rx) { oneshot::UpSuccess => { @@ -631,7 +620,7 @@ impl<T: Send> Clone for Sender<T> { fn clone(&self) -> Sender<T> { let (packet, sleeper, guard) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { - let a = Arc::new(RacyCell::new(shared::Packet::new())); + let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { let guard = (*a.get()).postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); @@ -643,7 +632,7 @@ impl<T: Send> Clone for Sender<T> { } } Flavor::Stream(ref p) => { - let a = Arc::new(RacyCell::new(shared::Packet::new())); + let a = Arc::new(UnsafeCell::new(shared::Packet::new())); unsafe { let guard = (*a.get()).postinit_lock(); let rx = Receiver::new(Flavor::Shared(a.clone())); @@ -689,13 +678,7 @@ impl<T: Send> Drop for Sender<T> { //////////////////////////////////////////////////////////////////////////////// impl<T: Send> SyncSender<T> { - #[cfg(stage0)] // NOTE remove impl after next snapshot - fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> { - SyncSender { inner: inner, _marker: marker::NoSync } - } - - #[cfg(not(stage0))] // NOTE remove cfg after next snapshot - fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> { + fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> { SyncSender { inner: inner } } @@ -978,33 +961,15 @@ impl<T: Send> Drop for Receiver<T> { } } -/// A version of `UnsafeCell` intended for use in concurrent data -/// structures (for example, you might put it in an `Arc`). -struct RacyCell<T>(pub UnsafeCell<T>); - -impl<T> RacyCell<T> { - - fn new(value: T) -> RacyCell<T> { - RacyCell(UnsafeCell { value: value }) - } - - unsafe fn get(&self) -> *mut T { - self.0.get() - } - -} - -unsafe impl<T:Send> Send for RacyCell<T> { } - -unsafe impl<T> Sync for RacyCell<T> { } // Oh dear - -impl<T> fmt::Show for SendError<T> { +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Display for SendError<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { "sending on a closed channel".fmt(f) } } -impl<T> fmt::Show for TrySendError<T> { +#[stable(feature = "rust1", since = "1.0.0")] +impl<T> fmt::Display for TrySendError<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { TrySendError::Full(..) => { @@ -1017,13 +982,15 @@ impl<T> fmt::Show for TrySendError<T> { } } -impl fmt::Show for RecvError { +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { "receiving on a closed channel".fmt(f) } } -impl fmt::Show for TryRecvError { +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { TryRecvError::Empty => { diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs index 92aec5cde07..c222c313ba6 100644 --- a/src/libstd/sync/mpsc/mpsc_queue.rs +++ b/src/libstd/sync/mpsc/mpsc_queue.rs @@ -46,6 +46,7 @@ use core::prelude::*; use alloc::boxed::Box; use core::mem; +use core::ptr; use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, Ordering}; @@ -82,7 +83,7 @@ unsafe impl<T:Send> Sync for Queue<T> { } impl<T> Node<T> { unsafe fn new(v: Option<T>) -> *mut Node<T> { mem::transmute(box Node { - next: AtomicPtr::new(0 as *mut Node<T>), + next: AtomicPtr::new(ptr::null_mut()), value: v, }) } diff --git a/src/libstd/sync/mpsc/select.rs b/src/libstd/sync/mpsc/select.rs index ee4d3a55481..e97c82a5b1b 100644 --- a/src/libstd/sync/mpsc/select.rs +++ b/src/libstd/sync/mpsc/select.rs @@ -60,6 +60,7 @@ use core::prelude::*; use core::cell::Cell; use core::marker; use core::mem; +use core::ptr; use core::uint; use sync::mpsc::{Receiver, RecvError}; @@ -67,24 +68,12 @@ use sync::mpsc::blocking::{self, SignalToken}; /// The "receiver set" of the select interface. This structure is used to manage /// a set of receivers which are being selected over. -#[cfg(stage0)] // NOTE remove impl after next snapshot pub struct Select { head: *mut Handle<'static, ()>, tail: *mut Handle<'static, ()>, next_id: Cell<uint>, - marker1: marker::NoSend, } -/// The "receiver set" of the select interface. This structure is used to manage -/// a set of receivers which are being selected over. -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot -pub struct Select { - head: *mut Handle<'static, ()>, - tail: *mut Handle<'static, ()>, - next_id: Cell<uint>, -} - -#[cfg(not(stage0))] // NOTE remove cfg after next snapshot impl !marker::Send for Select {} /// A handle to a receiver which is currently a member of a `Select` set of @@ -127,26 +116,10 @@ impl Select { /// /// Usage of this struct directly can sometimes be burdensome, and usage is /// rather much easier through the `select!` macro. - #[cfg(stage0)] // NOTE remove impl after next snapshot - pub fn new() -> Select { - Select { - marker1: marker::NoSend, - head: 0 as *mut Handle<'static, ()>, - tail: 0 as *mut Handle<'static, ()>, - next_id: Cell::new(1), - } - } - - /// Creates a new selection structure. This set is initially empty and - /// `wait` will panic!() if called. - /// - /// Usage of this struct directly can sometimes be burdensome, and usage is - /// rather much easier through the `select!` macro. - #[cfg(not(stage0))] // NOTE remove cfg after next snapshot pub fn new() -> Select { Select { - head: 0 as *mut Handle<'static, ()>, - tail: 0 as *mut Handle<'static, ()>, + head: ptr::null_mut(), + tail: ptr::null_mut(), next_id: Cell::new(1), } } @@ -160,8 +133,8 @@ impl Select { Handle { id: id, selector: self, - next: 0 as *mut Handle<'static, ()>, - prev: 0 as *mut Handle<'static, ()>, + next: ptr::null_mut(), + prev: ptr::null_mut(), added: false, rx: rx, packet: rx, @@ -326,8 +299,8 @@ impl<'rx, T: Send> Handle<'rx, T> { (*self.next).prev = self.prev; } - self.next = 0 as *mut Handle<'static, ()>; - self.prev = 0 as *mut Handle<'static, ()>; + self.next = ptr::null_mut(); + self.prev = ptr::null_mut(); self.added = false; } diff --git a/src/libstd/sync/mpsc/spsc_queue.rs b/src/libstd/sync/mpsc/spsc_queue.rs index 893260415eb..c1983fcab19 100644 --- a/src/libstd/sync/mpsc/spsc_queue.rs +++ b/src/libstd/sync/mpsc/spsc_queue.rs @@ -39,6 +39,7 @@ use core::prelude::*; use alloc::boxed::Box; use core::mem; +use core::ptr; use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; @@ -82,7 +83,7 @@ impl<T: Send> Node<T> { unsafe { mem::transmute(box Node { value: None, - next: AtomicPtr::new(0 as *mut Node<T>), + next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), }) } } @@ -131,7 +132,7 @@ impl<T: Send> Queue<T> { let n = self.alloc(); assert!((*n).value.is_none()); (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Ordering::Relaxed); + (*n).next.store(ptr::null_mut(), Ordering::Relaxed); (**self.head.get()).next.store(n, Ordering::Release); *self.head.get() = n; } diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs index 30304dffb75..d38f14a9130 100644 --- a/src/libstd/sync/mpsc/sync.rs +++ b/src/libstd/sync/mpsc/sync.rs @@ -40,6 +40,7 @@ use self::Blocker::*; use vec::Vec; use core::mem; +use core::ptr; use sync::atomic::{Ordering, AtomicUsize}; use sync::mpsc::blocking::{self, WaitToken, SignalToken}; @@ -145,8 +146,8 @@ impl<T: Send> Packet<T> { cap: cap, canceled: None, queue: Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, + head: ptr::null_mut(), + tail: ptr::null_mut(), }, buf: Buffer { buf: range(0, cap + if cap == 0 {1} else {0}).map(|_| None).collect(), @@ -160,7 +161,7 @@ impl<T: Send> Packet<T> { // wait until a send slot is available, returning locked access to // the channel state. fn acquire_send_slot(&self) -> MutexGuard<State<T>> { - let mut node = Node { token: None, next: 0 as *mut Node }; + let mut node = Node { token: None, next: ptr::null_mut() }; loop { let mut guard = self.lock.lock().unwrap(); // are we ready to go? @@ -343,8 +344,8 @@ impl<T: Send> Packet<T> { Vec::new() }; let mut queue = mem::replace(&mut guard.queue, Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, + head: ptr::null_mut(), + tail: ptr::null_mut(), }); let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { @@ -453,7 +454,7 @@ impl Queue { fn enqueue(&mut self, node: &mut Node) -> WaitToken { let (wait_token, signal_token) = blocking::tokens(); node.token = Some(signal_token); - node.next = 0 as *mut Node; + node.next = ptr::null_mut(); if self.tail.is_null() { self.head = node as *mut Node; @@ -475,10 +476,10 @@ impl Queue { let node = self.head; self.head = unsafe { (*node).next }; if self.head.is_null() { - self.tail = 0 as *mut Node; + self.tail = ptr::null_mut(); } unsafe { - (*node).next = 0 as *mut Node; + (*node).next = ptr::null_mut(); Some((*node).token.take().unwrap()) } } |
