diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-12-23 11:53:35 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-12-29 12:16:49 -0800 |
| commit | bc83a009f655dd3896be4a7cd33cac8032a605f2 (patch) | |
| tree | 3acc8533031219690fe14fa56f4427cfa9297296 /src/libstd/comm | |
| parent | bb8f4fc3b73918abd19d67be702f78e8f73d1874 (diff) | |
| download | rust-bc83a009f655dd3896be4a7cd33cac8032a605f2.tar.gz rust-bc83a009f655dd3896be4a7cd33cac8032a605f2.zip | |
std: Second pass stabilization for `comm`
This commit is a second pass stabilization for the `std::comm` module,
performing the following actions:
* The entire `std::comm` module was moved under `std::sync::mpsc`. This movement
reflects that channels are just yet another synchronization primitive, and
they don't necessarily deserve a special place outside of the other
concurrency primitives that the standard library offers.
* The `send` and `recv` methods have all been removed.
* The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`.
This means that all send/receive operations return a `Result` now indicating
whether the operation was successful or not.
* The error type of `send` is now a `SendError` to implement a custom error
message and allow for `unwrap()`. The error type contains an `into_inner`
method to extract the value.
* The error type of `recv` is now `RecvError` for the same reasons as `send`.
* The `TryRecvError` and `TrySendError` types have had public reexports removed
of their variants and the variant names have been tweaked with enum
namespacing rules.
* The `Messages` iterator is renamed to `Iter`
This functionality is now all `#[stable]`:
* `Sender`
* `SyncSender`
* `Receiver`
* `std::sync::mpsc`
* `channel`
* `sync_channel`
* `Iter`
* `Sender::send`
* `Sender::clone`
* `SyncSender::send`
* `SyncSender::try_send`
* `SyncSender::clone`
* `Receiver::recv`
* `Receiver::try_recv`
* `Receiver::iter`
* `SendError`
* `RecvError`
* `TrySendError::{mod, Full, Disconnected}`
* `TryRecvError::{mod, Empty, Disconnected}`
* `SendError::into_inner`
* `TrySendError::into_inner`
This is a breaking change due to the modification of where this module is
located, as well as the changing of the semantics of `send` and `recv`. Most
programs just need to rename imports of `std::comm` to `std::sync::mpsc` and
add calls to `unwrap` after a send or a receive operation.
[breaking-change]
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/blocking.rs | 87 | ||||
| -rw-r--r-- | src/libstd/comm/mod.rs | 2133 | ||||
| -rw-r--r-- | src/libstd/comm/mpsc_queue.rs | 205 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 375 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 749 | ||||
| -rw-r--r-- | src/libstd/comm/shared.rs | 486 | ||||
| -rw-r--r-- | src/libstd/comm/spsc_queue.rs | 343 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 484 | ||||
| -rw-r--r-- | src/libstd/comm/sync.rs | 483 |
9 files changed, 0 insertions, 5345 deletions
diff --git a/src/libstd/comm/blocking.rs b/src/libstd/comm/blocking.rs deleted file mode 100644 index 412b7161305..00000000000 --- a/src/libstd/comm/blocking.rs +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Generic support for building blocking abstractions. - -use thread::Thread; -use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering}; -use sync::Arc; -use kinds::{Sync, Send}; -use kinds::marker::{NoSend, NoSync}; -use mem; -use clone::Clone; - -struct Inner { - thread: Thread, - woken: AtomicBool, -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} - -#[deriving(Clone)] -pub struct SignalToken { - inner: Arc<Inner>, -} - -pub struct WaitToken { - inner: Arc<Inner>, - no_send: NoSend, - no_sync: NoSync, -} - -pub fn tokens() -> (WaitToken, SignalToken) { - let inner = Arc::new(Inner { - thread: Thread::current(), - woken: INIT_ATOMIC_BOOL, - }); - let wait_token = WaitToken { - inner: inner.clone(), - no_send: NoSend, - no_sync: NoSync, - }; - let signal_token = SignalToken { - inner: inner - }; - (wait_token, signal_token) -} - -impl SignalToken { - pub fn signal(&self) -> bool { - let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst); - if wake { - self.inner.thread.unpark(); - } - wake - } - - /// Convert to an unsafe uint value. Useful for storing in a pipe's state - /// flag. - #[inline] - pub unsafe fn cast_to_uint(self) -> uint { - mem::transmute(self.inner) - } - - /// Convert from an unsafe uint value. Useful for retrieving a pipe's state - /// flag. - #[inline] - pub unsafe fn cast_from_uint(signal_ptr: uint) -> SignalToken { - SignalToken { inner: mem::transmute(signal_ptr) } - } - -} - -impl WaitToken { - pub fn wait(self) { - while !self.inner.woken.load(Ordering::SeqCst) { - Thread::park() - } - } -} diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs deleted file mode 100644 index de7f3d00478..00000000000 --- a/src/libstd/comm/mod.rs +++ /dev/null @@ -1,2133 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Communication primitives for concurrent tasks -//! -//! Rust makes it very difficult to share data among tasks to prevent race -//! conditions and to improve parallelism, but there is often a need for -//! communication between concurrent tasks. The primitives defined in this -//! module are the building blocks for synchronization in rust. -//! -//! This module provides message-based communication over channels, concretely -//! defined among three types: -//! -//! * `Sender` -//! * `SyncSender` -//! * `Receiver` -//! -//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both -//! senders are clone-able such that many tasks can send simultaneously to one -//! receiver. These channels are *task blocking*, not *thread blocking*. This -//! means that if one task is blocked on a channel, other tasks can continue to -//! make progress. -//! -//! Rust channels come in one of two flavors: -//! -//! 1. An asynchronous, infinitely buffered channel. The `channel()` function -//! will return a `(Sender, Receiver)` tuple where all sends will be -//! **asynchronous** (they never block). The channel conceptually has an -//! infinite buffer. -//! -//! 2. A synchronous, bounded channel. The `sync_channel()` function will return -//! a `(SyncSender, Receiver)` tuple where the storage for pending messages -//! is a pre-allocated buffer of a fixed size. All sends will be -//! **synchronous** by blocking until there is buffer space available. Note -//! that a bound of 0 is allowed, causing the channel to become a -//! "rendezvous" channel where each sender atomically hands off a message to -//! a receiver. -//! -//! ## Panic Propagation -//! -//! In addition to being a core primitive for communicating in rust, channels -//! are the points at which panics are propagated among tasks. Whenever the one -//! half of channel is closed, the other half will have its next operation -//! `panic!`. The purpose of this is to allow propagation of panics among tasks -//! that are linked to one another via channels. -//! -//! There are methods on both of senders and receivers to perform their -//! respective operations without panicking, however. -//! -//! # Example -//! -//! Simple usage: -//! -//! ``` -//! use std::thread::Thread; -//! use std::comm::channel; -//! -//! // Create a simple streaming channel -//! let (tx, rx) = channel(); -//! Thread::spawn(move|| { -//! tx.send(10i); -//! }).detach(); -//! assert_eq!(rx.recv(), 10i); -//! ``` -//! -//! Shared usage: -//! -//! ``` -//! use std::thread::Thread; -//! use std::comm::channel; -//! -//! // Create a shared channel that can be sent along from many threads -//! // where tx is the sending half (tx for transmission), and rx is the receiving -//! // half (rx for receiving). -//! let (tx, rx) = channel(); -//! for i in range(0i, 10i) { -//! let tx = tx.clone(); -//! Thread::spawn(move|| { -//! tx.send(i); -//! }).detach() -//! } -//! -//! for _ in range(0i, 10i) { -//! let j = rx.recv(); -//! assert!(0 <= j && j < 10); -//! } -//! ``` -//! -//! Propagating panics: -//! -//! ```should_fail -//! use std::comm::channel; -//! -//! // The call to recv() will panic!() because the channel has already hung -//! // up (or been deallocated) -//! let (tx, rx) = channel::<int>(); -//! drop(tx); -//! rx.recv(); -//! ``` -//! -//! Synchronous channels: -//! -//! ``` -//! use std::thread::Thread; -//! use std::comm::sync_channel; -//! -//! let (tx, rx) = sync_channel::<int>(0); -//! Thread::spawn(move|| { -//! // This will wait for the parent task to start receiving -//! tx.send(53); -//! }).detach(); -//! rx.recv(); -//! ``` -//! -//! Reading from a channel with a timeout requires to use a Timer together -//! with the channel. You can use the select! macro to select either and -//! handle the timeout case. This first example will break out of the loop -//! after 10 seconds no matter what: -//! -//! ```no_run -//! use std::comm::channel; -//! use std::io::timer::Timer; -//! use std::time::Duration; -//! -//! let (tx, rx) = channel::<int>(); -//! let mut timer = Timer::new().unwrap(); -//! let timeout = timer.oneshot(Duration::seconds(10)); -//! -//! loop { -//! select! { -//! val = rx.recv() => println!("Received {}", val), -//! () = timeout.recv() => { -//! println!("timed out, total time was more than 10 seconds"); -//! break; -//! } -//! } -//! } -//! ``` -//! -//! This second example is more costly since it allocates a new timer every -//! time a message is received, but it allows you to timeout after the channel -//! has been inactive for 5 seconds: -//! -//! ```no_run -//! use std::comm::channel; -//! use std::io::timer::Timer; -//! use std::time::Duration; -//! -//! let (tx, rx) = channel::<int>(); -//! let mut timer = Timer::new().unwrap(); -//! -//! loop { -//! let timeout = timer.oneshot(Duration::seconds(5)); -//! -//! select! { -//! val = rx.recv() => println!("Received {}", val), -//! () = timeout.recv() => { -//! println!("timed out, no message received in 5 seconds"); -//! break; -//! } -//! } -//! } -//! ``` - -// A description of how Rust's channel implementation works -// -// Channels are supposed to be the basic building block for all other -// concurrent primitives that are used in Rust. As a result, the channel type -// needs to be highly optimized, flexible, and broad enough for use everywhere. -// -// The choice of implementation of all channels is to be built on lock-free data -// structures. The channels themselves are then consequently also lock-free data -// structures. As always with lock-free code, this is a very "here be dragons" -// territory, especially because I'm unaware of any academic papers that have -// gone into great length about channels of these flavors. -// -// ## Flavors of channels -// -// From the perspective of a consumer of this library, there is only one flavor -// of channel. This channel can be used as a stream and cloned to allow multiple -// senders. Under the hood, however, there are actually three flavors of -// channels in play. -// -// * Oneshots - these channels are highly optimized for the one-send use case. -// They contain as few atomics as possible and involve one and -// exactly one allocation. -// * Streams - these channels are optimized for the non-shared use case. They -// use a different concurrent queue that is more tailored for this -// use case. The initial allocation of this flavor of channel is not -// optimized. -// * Shared - this is the most general form of channel that this module offers, -// a channel with multiple senders. This type is as optimized as it -// can be, but the previous two types mentioned are much faster for -// their use-cases. -// -// ## Concurrent queues -// -// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but -// recv() obviously blocks. This means that under the hood there must be some -// shared and concurrent queue holding all of the actual data. -// -// With two flavors of channels, two flavors of queues are also used. We have -// chosen to use queues from a well-known author that are abbreviated as SPSC -// and MPSC (single producer, single consumer and multiple producer, single -// consumer). SPSC queues are used for streams while MPSC queues are used for -// shared channels. -// -// ### SPSC optimizations -// -// The SPSC queue found online is essentially a linked list of nodes where one -// half of the nodes are the "queue of data" and the other half of nodes are a -// cache of unused nodes. The unused nodes are used such that an allocation is -// not required on every push() and a free doesn't need to happen on every -// pop(). -// -// As found online, however, the cache of nodes is of an infinite size. This -// means that if a channel at one point in its life had 50k items in the queue, -// then the queue will always have the capacity for 50k items. I believed that -// this was an unnecessary limitation of the implementation, so I have altered -// the queue to optionally have a bound on the cache size. -// -// By default, streams will have an unbounded SPSC queue with a small-ish cache -// size. The hope is that the cache is still large enough to have very fast -// send() operations while not too large such that millions of channels can -// coexist at once. -// -// ### MPSC optimizations -// -// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses -// a linked list under the hood to earn its unboundedness, but I have not put -// forth much effort into having a cache of nodes similar to the SPSC queue. -// -// For now, I believe that this is "ok" because shared channels are not the most -// common type, but soon we may wish to revisit this queue choice and determine -// another candidate for backend storage of shared channels. -// -// ## Overview of the Implementation -// -// Now that there's a little background on the concurrent queues used, it's -// worth going into much more detail about the channels themselves. The basic -// pseudocode for a send/recv are: -// -// -// send(t) recv() -// queue.push(t) return if queue.pop() -// if increment() == -1 deschedule { -// wakeup() if decrement() > 0 -// cancel_deschedule() -// } -// queue.pop() -// -// As mentioned before, there are no locks in this implementation, only atomic -// instructions are used. -// -// ### The internal atomic counter -// -// Every channel has a shared counter with each half to keep track of the size -// of the queue. This counter is used to abort descheduling by the receiver and -// to know when to wake up on the sending side. -// -// As seen in the pseudocode, senders will increment this count and receivers -// will decrement the count. The theory behind this is that if a sender sees a -// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, -// then it doesn't need to block. -// -// The recv() method has a beginning call to pop(), and if successful, it needs -// to decrement the count. It is a crucial implementation detail that this -// decrement does *not* happen to the shared counter. If this were the case, -// then it would be possible for the counter to be very negative when there were -// no receivers waiting, in which case the senders would have to determine when -// it was actually appropriate to wake up a receiver. -// -// Instead, the "steal count" is kept track of separately (not atomically -// because it's only used by receivers), and then the decrement() call when -// descheduling will lump in all of the recent steals into one large decrement. -// -// The implication of this is that if a sender sees a -1 count, then there's -// guaranteed to be a waiter waiting! -// -// ## Native Implementation -// -// A major goal of these channels is to work seamlessly on and off the runtime. -// All of the previous race conditions have been worded in terms of -// scheduler-isms (which is obviously not available without the runtime). -// -// For now, native usage of channels (off the runtime) will fall back onto -// mutexes/cond vars for descheduling/atomic decisions. The no-contention path -// is still entirely lock-free, the "deschedule" blocks above are surrounded by -// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a -// condition variable. -// -// ## Select -// -// Being able to support selection over channels has greatly influenced this -// design, and not only does selection need to work inside the runtime, but also -// outside the runtime. -// -// The implementation is fairly straightforward. The goal of select() is not to -// return some data, but only to return which channel can receive data without -// blocking. The implementation is essentially the entire blocking procedure -// followed by an increment as soon as its woken up. The cancellation procedure -// involves an increment and swapping out of to_wake to acquire ownership of the -// task to unblock. -// -// Sadly this current implementation requires multiple allocations, so I have -// seen the throughput of select() be much worse than it should be. I do not -// believe that there is anything fundamental that needs to change about these -// channels, however, in order to support a more efficient select(). -// -// # Conclusion -// -// And now that you've seen all the races that I found and attempted to fix, -// here's the code for you to find some more! - -use core::prelude::*; - -pub use self::TryRecvError::*; -pub use self::TrySendError::*; -use self::Flavor::*; - -use alloc::arc::Arc; -use core::kinds; -use core::kinds::marker; -use core::mem; -use core::cell::UnsafeCell; - -pub use self::select::{Select, Handle}; -use self::select::StartResult; -use self::select::StartResult::*; -use self::blocking::SignalToken; - -mod blocking; -mod oneshot; -mod select; -mod shared; -mod stream; -mod sync; -mod mpsc_queue; -mod spsc_queue; - -/// The receiving-half of Rust's channel type. This half can only be owned by -/// one task -#[unstable] -pub struct Receiver<T> { - inner: UnsafeCell<Flavor<T>>, -} - -// The receiver port can be sent from place to place, so long as it -// is not used to receive non-sendable things. -unsafe impl<T:Send> Send for Receiver<T> { } - -/// An iterator over messages on a receiver, this iterator will block -/// whenever `next` is called, waiting for a new message, and `None` will be -/// returned when the corresponding channel has hung up. -#[unstable] -pub struct Messages<'a, T:'a> { - rx: &'a Receiver<T> -} - -/// The sending-half of Rust's asynchronous channel type. This half can only be -/// owned by one task, but it can be cloned to send to other tasks. -#[unstable] -pub struct Sender<T> { - inner: UnsafeCell<Flavor<T>>, -} - -// The send port can be sent from place to place, so long as it -// is not used to send non-sendable things. -unsafe impl<T:Send> Send for Sender<T> { } - -/// The sending-half of Rust's synchronous channel type. This half can only be -/// owned by one task, but it can be cloned to send to other tasks. -#[unstable = "this type may be renamed, but it will always exist"] -pub struct SyncSender<T> { - inner: Arc<RacyCell<sync::Packet<T>>>, - // can't share in an arc - _marker: marker::NoSync, -} - -/// This enumeration is the list of the possible reasons that try_recv could not -/// return data when called. -#[deriving(PartialEq, Clone, Copy, Show)] -#[experimental = "this is likely to be removed in changing try_recv()"] -pub enum TryRecvError { - /// This channel is currently empty, but the sender(s) have not yet - /// disconnected, so data may yet become available. - Empty, - /// This channel's sending half has become disconnected, and there will - /// never be any more data received on this channel - Disconnected, -} - -/// This enumeration is the list of the possible error outcomes for the -/// `SyncSender::try_send` method. -#[deriving(PartialEq, Clone, Show)] -#[experimental = "this is likely to be removed in changing try_send()"] -pub enum TrySendError<T> { - /// The data could not be sent on the channel because it would require that - /// the callee block to send the data. - /// - /// If this is a buffered channel, then the buffer is full at this time. If - /// this is not a buffered channel, then there is no receiver available to - /// acquire the data. - Full(T), - /// This channel's receiving half has disconnected, so the data could not be - /// sent. The data is returned back to the callee in this case. - RecvDisconnected(T), -} - -enum Flavor<T> { - Oneshot(Arc<RacyCell<oneshot::Packet<T>>>), - Stream(Arc<RacyCell<stream::Packet<T>>>), - Shared(Arc<RacyCell<shared::Packet<T>>>), - Sync(Arc<RacyCell<sync::Packet<T>>>), -} - -#[doc(hidden)] -trait UnsafeFlavor<T> { - fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>; - unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> { - &mut *self.inner_unsafe().get() - } - unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> { - &*self.inner_unsafe().get() - } -} -impl<T> UnsafeFlavor<T> for Sender<T> { - fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> { - &self.inner - } -} -impl<T> UnsafeFlavor<T> for Receiver<T> { - fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> { - &self.inner - } -} - -/// Creates a new asynchronous channel, returning the sender/receiver halves. -/// -/// All data sent on the sender will become available on the receiver, and no -/// send will block the calling task (this channel has an "infinite buffer"). -/// -/// # Example -/// -/// ``` -/// use std::comm::channel; -/// use std::thread::Thread; -/// -/// // tx is is the sending half (tx for transmission), and rx is the receiving -/// // half (rx for receiving). -/// let (tx, rx) = channel(); -/// -/// // Spawn off an expensive computation -/// Thread::spawn(move|| { -/// # fn expensive_computation() {} -/// tx.send(expensive_computation()); -/// }).detach(); -/// -/// // Do some useful work for awhile -/// -/// // Let's see what that answer was -/// println!("{}", rx.recv()); -/// ``` -#[unstable] -pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { - let a = Arc::new(RacyCell::new(oneshot::Packet::new())); - (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) -} - -/// Creates a new synchronous, bounded channel. -/// -/// Like asynchronous channels, the `Receiver` will block until a message -/// becomes available. These channels differ greatly in the semantics of the -/// sender from asynchronous channels, however. -/// -/// This channel has an internal buffer on which messages will be queued. When -/// the internal buffer becomes full, future sends will *block* waiting for the -/// buffer to open up. Note that a buffer size of 0 is valid, in which case this -/// becomes "rendezvous channel" where each send will not return until a recv -/// is paired with it. -/// -/// As with asynchronous channels, all senders will panic in `send` if the -/// `Receiver` has been destroyed. -/// -/// # Example -/// -/// ``` -/// use std::comm::sync_channel; -/// use std::thread::Thread; -/// -/// let (tx, rx) = sync_channel(1); -/// -/// // this returns immediately -/// tx.send(1i); -/// -/// Thread::spawn(move|| { -/// // this will block until the previous message has been received -/// tx.send(2i); -/// }).detach(); -/// -/// assert_eq!(rx.recv(), 1i); -/// assert_eq!(rx.recv(), 2i); -/// ``` -#[unstable = "this function may be renamed to more accurately reflect the type \ - of channel that is is creating"] -pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { - let a = Arc::new(RacyCell::new(sync::Packet::new(bound))); - (SyncSender::new(a.clone()), Receiver::new(Sync(a))) -} - -//////////////////////////////////////////////////////////////////////////////// -// Sender -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> Sender<T> { - fn new(inner: Flavor<T>) -> Sender<T> { - Sender { - inner: UnsafeCell::new(inner), - } - } - - /// Sends a value along this channel to be received by the corresponding - /// receiver. - /// - /// Rust channels are infinitely buffered so this method will never block. - /// - /// # Panics - /// - /// This function will panic if the other end of the channel has hung up. - /// This means that if the corresponding receiver has fallen out of scope, - /// this function will trigger a panic message saying that a message is - /// being sent on a closed channel. - /// - /// Note that if this function does *not* panic, it does not mean that the - /// data will be successfully received. All sends are placed into a queue, - /// so it is possible for a send to succeed (the other end is alive), but - /// then the other end could immediately disconnect. - /// - /// The purpose of this functionality is to propagate panics among tasks. - /// If a panic is not desired, then consider using the `send_opt` method - #[experimental = "this function is being considered candidate for removal \ - to adhere to the general guidelines of rust"] - pub fn send(&self, t: T) { - if self.send_opt(t).is_err() { - panic!("sending on a closed channel"); - } - } - - /// Attempts to send a value on this channel, returning it back if it could - /// not be sent. - /// - /// A successful send occurs when it is determined that the other end of - /// the channel has not hung up already. An unsuccessful send would be one - /// where the corresponding receiver has already been deallocated. Note - /// that a return value of `Err` means that the data will never be - /// received, but a return value of `Ok` does *not* mean that the data - /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns `Ok`. - /// - /// Like `send`, this method will never block. - /// - /// # Panics - /// - /// This method will never panic, it will return the message back to the - /// caller if the other end is disconnected - /// - /// # Example - /// - /// ``` - /// use std::comm::channel; - /// - /// let (tx, rx) = channel(); - /// - /// // This send is always successful - /// assert_eq!(tx.send_opt(1i), Ok(())); - /// - /// // This send will fail because the receiver is gone - /// drop(rx); - /// assert_eq!(tx.send_opt(1i), Err(1)); - /// ``` - #[unstable = "this function may be renamed to send() in the future"] - pub fn send_opt(&self, t: T) -> Result<(), T> { - let (new_inner, ret) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - unsafe { - let p = p.get(); - if !(*p).sent() { - return (*p).send(t); - } else { - let a = - Arc::new(RacyCell::new(stream::Packet::new())); - match (*p).upgrade(Receiver::new(Stream(a.clone()))) { - oneshot::UpSuccess => { - let ret = (*a.get()).send(t); - (a, ret) - } - oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(token) => { - // This send cannot panic because the thread is - // asleep (we're looking at it), so the receiver - // can't go away. - (*a.get()).send(t).ok().unwrap(); - token.signal(); - (a, Ok(())) - } - } - } - } - } - Stream(ref p) => return unsafe { (*p.get()).send(t) }, - Shared(ref p) => return unsafe { (*p.get()).send(t) }, - Sync(..) => unreachable!(), - }; - - unsafe { - let tmp = Sender::new(Stream(new_inner)); - mem::swap(self.inner_mut(), tmp.inner_mut()); - } - return ret; - } -} - -#[stable] -impl<T: Send> Clone for Sender<T> { - fn clone(&self) -> Sender<T> { - let (packet, sleeper, guard) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - let a = Arc::new(RacyCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); - match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - oneshot::UpSuccess | - oneshot::UpDisconnected => (a, None, guard), - oneshot::UpWoke(task) => (a, Some(task), guard) - } - } - } - Stream(ref p) => { - let a = Arc::new(RacyCell::new(shared::Packet::new())); - unsafe { - let guard = (*a.get()).postinit_lock(); - match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - stream::UpSuccess | - stream::UpDisconnected => (a, None, guard), - stream::UpWoke(task) => (a, Some(task), guard), - } - } - } - Shared(ref p) => { - unsafe { (*p.get()).clone_chan(); } - return Sender::new(Shared(p.clone())); - } - Sync(..) => unreachable!(), - }; - - unsafe { - (*packet.get()).inherit_blocker(sleeper, guard); - - let tmp = Sender::new(Shared(packet.clone())); - mem::swap(self.inner_mut(), tmp.inner_mut()); - } - Sender::new(Shared(packet)) - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Sender<T> { - fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Sync(..) => unreachable!(), - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// SyncSender -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> SyncSender<T> { - fn new(inner: Arc<RacyCell<sync::Packet<T>>>) -> SyncSender<T> { - SyncSender { inner: inner, _marker: marker::NoSync } - } - - /// Sends a value on this synchronous channel. - /// - /// This function will *block* until space in the internal buffer becomes - /// available or a receiver is available to hand off the message to. - /// - /// Note that a successful send does *not* guarantee that the receiver will - /// ever see the data if there is a buffer on this channel. Messages may be - /// enqueued in the internal buffer for the receiver to receive at a later - /// time. If the buffer size is 0, however, it can be guaranteed that the - /// receiver has indeed received the data if this function returns success. - /// - /// # Panics - /// - /// Similarly to `Sender::send`, this function will panic if the - /// corresponding `Receiver` for this channel has disconnected. This - /// behavior is used to propagate panics among tasks. - /// - /// If a panic is not desired, you can achieve the same semantics with the - /// `SyncSender::send_opt` method which will not panic if the receiver - /// disconnects. - #[experimental = "this function is being considered candidate for removal \ - to adhere to the general guidelines of rust"] - pub fn send(&self, t: T) { - if self.send_opt(t).is_err() { - panic!("sending on a closed channel"); - } - } - - /// Send a value on a channel, returning it back if the receiver - /// disconnected - /// - /// This method will *block* to send the value `t` on the channel, but if - /// the value could not be sent due to the receiver disconnecting, the value - /// is returned back to the callee. This function is similar to `try_send`, - /// except that it will block if the channel is currently full. - /// - /// # Panics - /// - /// This function cannot panic. - #[unstable = "this function may be renamed to send() in the future"] - pub fn send_opt(&self, t: T) -> Result<(), T> { - unsafe { (*self.inner.get()).send(t) } - } - - /// Attempts to send a value on this channel without blocking. - /// - /// This method differs from `send_opt` by returning immediately if the - /// channel's buffer is full or no receiver is waiting to acquire some - /// data. Compared with `send_opt`, this function has two failure cases - /// instead of one (one for disconnection, one for a full buffer). - /// - /// See `SyncSender::send` for notes about guarantees of whether the - /// receiver has received the data or not if this function is successful. - /// - /// # Panics - /// - /// This function cannot panic - #[unstable = "the return type of this function is candidate for \ - modification"] - pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { - unsafe { (*self.inner.get()).try_send(t) } - } -} - -#[stable] -impl<T: Send> Clone for SyncSender<T> { - fn clone(&self) -> SyncSender<T> { - unsafe { (*self.inner.get()).clone_chan(); } - return SyncSender::new(self.inner.clone()); - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for SyncSender<T> { - fn drop(&mut self) { - unsafe { (*self.inner.get()).drop_chan(); } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Receiver -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> Receiver<T> { - fn new(inner: Flavor<T>) -> Receiver<T> { - Receiver { inner: UnsafeCell::new(inner) } - } - - /// Blocks waiting for a value on this receiver - /// - /// This function will block if necessary to wait for a corresponding send - /// on the channel from its paired `Sender` structure. This receiver will - /// be woken up when data is ready, and the data will be returned. - /// - /// # Panics - /// - /// Similar to channels, this method will trigger a task panic if the - /// other end of the channel has hung up (been deallocated). The purpose of - /// this is to propagate panics among tasks. - /// - /// If a panic is not desired, then there are two options: - /// - /// * If blocking is still desired, the `recv_opt` method will return `None` - /// when the other end hangs up - /// - /// * If blocking is not desired, then the `try_recv` method will attempt to - /// peek at a value on this receiver. - #[experimental = "this function is being considered candidate for removal \ - to adhere to the general guidelines of rust"] - pub fn recv(&self) -> T { - match self.recv_opt() { - Ok(t) => t, - Err(()) => panic!("receiving on a closed channel"), - } - } - - /// Attempts to return a pending value on this receiver without blocking - /// - /// This method will never block the caller in order to wait for data to - /// become available. Instead, this will always return immediately with a - /// possible option of pending data on the channel. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a receiver. - /// - /// # Panics - /// - /// This function cannot panic. - #[unstable = "the return type of this function may be altered"] - pub fn try_recv(&self) -> Result<T, TryRecvError> { - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return Err(Empty), - Err(oneshot::Disconnected) => return Err(Disconnected), - Err(oneshot::Upgraded(rx)) => rx, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(stream::Empty) => return Err(Empty), - Err(stream::Disconnected) => return Err(Disconnected), - Err(stream::Upgraded(rx)) => rx, - } - } - Shared(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(shared::Empty) => return Err(Empty), - Err(shared::Disconnected) => return Err(Disconnected), - } - } - Sync(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(sync::Empty) => return Err(Empty), - Err(sync::Disconnected) => return Err(Disconnected), - } - } - }; - unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); - } - } - } - - /// Attempt to wait for a value on this receiver, but does not panic if the - /// corresponding channel has hung up. - /// - /// This implementation of iterators for ports will always block if there is - /// not data available on the receiver, but it will not panic in the case - /// that the channel has been deallocated. - /// - /// In other words, this function has the same semantics as the `recv` - /// method except for the panic aspect. - /// - /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of - /// the value found on the receiver is returned. - #[unstable = "this function may be renamed to recv()"] - pub fn recv_opt(&self) -> Result<T, ()> { - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return unreachable!(), - Err(oneshot::Disconnected) => return Err(()), - Err(oneshot::Upgraded(rx)) => rx, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(stream::Empty) => return unreachable!(), - Err(stream::Disconnected) => return Err(()), - Err(stream::Upgraded(rx)) => rx, - } - } - Shared(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(shared::Empty) => return unreachable!(), - Err(shared::Disconnected) => return Err(()), - } - } - Sync(ref p) => return unsafe { (*p.get()).recv() } - }; - unsafe { - mem::swap(self.inner_mut(), new_port.inner_mut()); - } - } - } - - /// Returns an iterator that will block waiting for messages, but never - /// `panic!`. It will return `None` when the channel has hung up. - #[unstable] - pub fn iter<'a>(&'a self) -> Messages<'a, T> { - Messages { rx: self } - } -} - -impl<T: Send> select::Packet for Receiver<T> { - fn can_recv(&self) -> bool { - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).can_recv() } { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).can_recv() } { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Shared(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - Sync(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - }; - unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); - } - } - } - - fn start_selection(&self, mut token: SignalToken) -> StartResult { - loop { - let (t, new_port) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { - oneshot::SelSuccess => return Installed, - oneshot::SelCanceled => return Abort, - oneshot::SelUpgraded(t, rx) => (t, rx), - } - } - Stream(ref p) => { - match unsafe { (*p.get()).start_selection(token) } { - stream::SelSuccess => return Installed, - stream::SelCanceled => return Abort, - stream::SelUpgraded(t, rx) => (t, rx), - } - } - Shared(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } - Sync(ref p) => { - return unsafe { (*p.get()).start_selection(token) }; - } - }; - token = t; - unsafe { - mem::swap(self.inner_mut(), new_port.inner_mut()); - } - } - } - - fn abort_selection(&self) -> bool { - let mut was_upgrade = false; - loop { - let result = match *unsafe { self.inner() } { - Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, - Stream(ref p) => unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Shared(ref p) => return unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Sync(ref p) => return unsafe { - (*p.get()).abort_selection() - }, - }; - let new_port = match result { Ok(b) => return b, Err(p) => p }; - was_upgrade = true; - unsafe { - mem::swap(self.inner_mut(), - new_port.inner_mut()); - } - } - } -} - -#[unstable] -impl<'a, T: Send> Iterator<T> for Messages<'a, T> { - fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Receiver<T> { - fn drop(&mut self) { - match *unsafe { self.inner_mut() } { - Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, - } - } -} - -/// A version of `UnsafeCell` intended for use in concurrent data -/// structures (for example, you might put it in an `Arc`). -struct RacyCell<T>(pub UnsafeCell<T>); - -impl<T> RacyCell<T> { - - fn new(value: T) -> RacyCell<T> { - RacyCell(UnsafeCell { value: value }) - } - - unsafe fn get(&self) -> *mut T { - self.0.get() - } - -} - -unsafe impl<T:Send> Send for RacyCell<T> { } - -unsafe impl<T> kinds::Sync for RacyCell<T> { } // Oh dear - - -#[cfg(test)] -mod test { - use prelude::v1::*; - - use os; - use super::*; - use thread::Thread; - use str::from_str; - - pub fn stress_factor() -> uint { - match os::getenv("RUST_TEST_STRESS") { - Some(val) => from_str::<uint>(val.as_slice()).unwrap(), - None => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = channel::<int>(); - tx.send(1); - assert_eq!(rx.recv(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = channel(); - tx.send(box 1i); - } - - #[test] - fn drop_full_shared() { - let (tx, _rx) = channel(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(box 1i); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = channel::<int>(); - tx.send(1); - assert_eq!(rx.recv(), 1); - let tx = tx.clone(); - tx.send(1); - assert_eq!(rx.recv(), 1); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - tx.send(1); - }); - assert_eq!(rx.recv(), 1); - } - - #[test] - #[should_fail] - fn smoke_port_gone() { - let (tx, rx) = channel::<int>(); - drop(rx); - tx.send(1); - } - - #[test] - #[should_fail] - fn smoke_shared_port_gone() { - let (tx, rx) = channel::<int>(); - drop(rx); - tx.send(1); - } - - #[test] - #[should_fail] - fn smoke_shared_port_gone2() { - let (tx, rx) = channel::<int>(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - tx2.send(1); - } - - #[test] - #[should_fail] - fn port_gone_concurrent() { - let (tx, rx) = channel::<int>(); - Thread::spawn(move|| { - rx.recv(); - }).detach(); - loop { tx.send(1) } - } - - #[test] - #[should_fail] - fn port_gone_concurrent_shared() { - let (tx, rx) = channel::<int>(); - let tx2 = tx.clone(); - Thread::spawn(move|| { - rx.recv(); - }).detach(); - loop { - tx.send(1); - tx2.send(1); - } - } - - #[test] - #[should_fail] - fn smoke_chan_gone() { - let (tx, rx) = channel::<int>(); - drop(tx); - rx.recv(); - } - - #[test] - #[should_fail] - fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - rx.recv(); - } - - #[test] - #[should_fail] - fn chan_gone_concurrent() { - let (tx, rx) = channel::<int>(); - Thread::spawn(move|| { - tx.send(1); - tx.send(1); - }).detach(); - loop { rx.recv(); } - } - - #[test] - fn stress() { - let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { - for _ in range(0u, 10000) { tx.send(1i); } - }); - for _ in range(0u, 10000) { - assert_eq!(rx.recv(), 1); - } - t.join().ok().unwrap(); - } - - #[test] - fn stress_shared() { - static AMT: uint = 10000; - static NTHREADS: uint = 8; - let (tx, rx) = channel::<int>(); - - let t = Thread::spawn(move|| { - for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - }); - - for _ in range(0, NTHREADS) { - let tx = tx.clone(); - Thread::spawn(move|| { - for _ in range(0, AMT) { tx.send(1); } - }).detach(); - } - drop(tx); - t.join().ok().unwrap(); - } - - #[test] - fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { - tx1.send(()); - for _ in range(0i, 40) { - assert_eq!(rx2.recv(), 1); - } - }); - rx1.recv(); - let t2 = Thread::spawn(move|| { - for _ in range(0i, 40) { - tx2.send(1); - } - }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); - } - - #[test] - fn recv_from_outside_runtime() { - let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { - for _ in range(0i, 40) { - assert_eq!(rx.recv(), 1); - } - }); - for _ in range(0u, 40) { - tx.send(1); - } - t.join().ok().unwrap(); - } - - #[test] - fn no_runtime() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { - assert_eq!(rx1.recv(), 1); - tx2.send(2); - }); - let t2 = Thread::spawn(move|| { - tx1.send(1); - assert_eq!(rx2.recv(), 2); - }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::<int>(); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::<int>(); - drop(tx); - } - - #[test] - #[should_fail] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::<Box<int>>(); - drop(rx); - tx.send(box 0); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = Thread::spawn(move|| { - let (tx, rx) = channel::<int>(); - drop(tx); - rx.recv(); - }).join(); - // What is our res? - assert!(res.is_err()); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::<Box<int>>(); - tx.send(box 10); - assert!(rx.recv() == box 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::<int>(); - assert!(tx.send_opt(10).is_ok()); - assert!(rx.recv() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::<int>(); - drop(rx); - assert!(tx.send_opt(10).is_err()); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::<int>(); - tx.send(10); - assert!(rx.recv_opt() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::<int>(); - drop(tx); - assert!(rx.recv_opt() == Err(())); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(Empty)); - tx.send(10); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::<int>(); - drop(tx); - assert_eq!(rx.try_recv(), Err(Disconnected)); - assert_eq!(rx.try_recv(), Err(Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::<Box<int>>(); - let _t = Thread::spawn(move|| { - assert!(rx.recv() == box 10); - }); - - tx.send(box 10); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::<Box<int>>(); - let _t = Thread::spawn(move|| { - drop(tx); - }); - let res = Thread::spawn(move|| { - assert!(rx.recv() == box 10); - }).join(); - assert!(res.is_err()); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - drop(rx); - }); - drop(tx); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - drop(rx); - }); - let _ = Thread::spawn(move|| { - tx.send(1); - }).join(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - Thread::spawn(move|| { - let res = Thread::spawn(move|| { - rx.recv(); - }).join(); - assert!(res.is_err()); - }).detach(); - let _t = Thread::spawn(move|| { - Thread::spawn(move|| { - drop(tx); - }).detach(); - }); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel(); - let _t = Thread::spawn(move|| { - tx.send(box 10i); - }); - assert!(rx.recv() == box 10i); - } - } - - #[test] - fn stream_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel(); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: Sender<Box<int>>, i: int) { - if i == 10 { return } - - Thread::spawn(move|| { - tx.send(box i); - send(tx, i + 1); - }).detach(); - } - - fn recv(rx: Receiver<Box<int>>, i: int) { - if i == 10 { return } - - Thread::spawn(move|| { - assert!(rx.recv() == box i); - recv(rx, i + 1); - }).detach(); - } - } - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in range(0i, 10000) { tx.send(()); } - for _ in range(0i, 10000) { rx.recv(); } - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - for _ in range(0, total) { - let tx = tx.clone(); - Thread::spawn(move|| { - tx.send(()); - }).detach(); - } - - for _ in range(0, total) { - rx.recv(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = channel::<int>(); - let (total_tx, total_rx) = channel::<int>(); - - let _t = Thread::spawn(move|| { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc); - }); - - tx.send(3); - tx.send(1); - tx.send(2); - drop(tx); - assert_eq!(total_rx.recv(), 6); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = channel::<int>(); - let (count_tx, count_rx) = channel(); - - let _t = Thread::spawn(move|| { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count); - }); - - tx.send(2); - tx.send(2); - tx.send(2); - let _ = tx.send_opt(2); - drop(tx); - assert_eq!(count_rx.recv(), 4); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = Thread::spawn(move|| { - rx2.recv(); - tx1.send(1); - tx3.send(()); - rx2.recv(); - drop(tx1); - tx3.send(()); - }); - - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Err(Disconnected)); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - let _t = Thread::spawn(move|| { - rx.recv(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()); - }); - // make sure the other task has gone to sleep - for _ in range(0u, 5000) { Thread::yield_now(); } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()); - - // wait for the child task to exit before we exit - rx2.recv(); - } -} - -#[cfg(test)] -mod sync_tests { - use prelude::v1::*; - use os; - use thread::Thread; - use super::*; - use str::from_str; - - pub fn stress_factor() -> uint { - match os::getenv("RUST_TEST_STRESS") { - Some(val) => from_str::<uint>(val.as_slice()).unwrap(), - None => 1, - } - } - - #[test] - fn smoke() { - let (tx, rx) = sync_channel::<int>(1); - tx.send(1); - assert_eq!(rx.recv(), 1); - } - - #[test] - fn drop_full() { - let (tx, _rx) = sync_channel(1); - tx.send(box 1i); - } - - #[test] - fn smoke_shared() { - let (tx, rx) = sync_channel::<int>(1); - tx.send(1); - assert_eq!(rx.recv(), 1); - let tx = tx.clone(); - tx.send(1); - assert_eq!(rx.recv(), 1); - } - - #[test] - fn smoke_threads() { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - tx.send(1); - }); - assert_eq!(rx.recv(), 1); - } - - #[test] - #[should_fail] - fn smoke_port_gone() { - let (tx, rx) = sync_channel::<int>(0); - drop(rx); - tx.send(1); - } - - #[test] - #[should_fail] - fn smoke_shared_port_gone2() { - let (tx, rx) = sync_channel::<int>(0); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - tx2.send(1); - } - - #[test] - #[should_fail] - fn port_gone_concurrent() { - let (tx, rx) = sync_channel::<int>(0); - Thread::spawn(move|| { - rx.recv(); - }).detach(); - loop { tx.send(1) } - } - - #[test] - #[should_fail] - fn port_gone_concurrent_shared() { - let (tx, rx) = sync_channel::<int>(0); - let tx2 = tx.clone(); - Thread::spawn(move|| { - rx.recv(); - }).detach(); - loop { - tx.send(1); - tx2.send(1); - } - } - - #[test] - #[should_fail] - fn smoke_chan_gone() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - rx.recv(); - } - - #[test] - #[should_fail] - fn smoke_chan_gone_shared() { - let (tx, rx) = sync_channel::<()>(0); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - rx.recv(); - } - - #[test] - #[should_fail] - fn chan_gone_concurrent() { - let (tx, rx) = sync_channel::<int>(0); - Thread::spawn(move|| { - tx.send(1); - tx.send(1); - }).detach(); - loop { rx.recv(); } - } - - #[test] - fn stress() { - let (tx, rx) = sync_channel::<int>(0); - Thread::spawn(move|| { - for _ in range(0u, 10000) { tx.send(1); } - }).detach(); - for _ in range(0u, 10000) { - assert_eq!(rx.recv(), 1); - } - } - - #[test] - fn stress_shared() { - static AMT: uint = 1000; - static NTHREADS: uint = 8; - let (tx, rx) = sync_channel::<int>(0); - let (dtx, drx) = sync_channel::<()>(0); - - Thread::spawn(move|| { - for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv(), 1); - } - match rx.try_recv() { - Ok(..) => panic!(), - _ => {} - } - dtx.send(()); - }).detach(); - - for _ in range(0, NTHREADS) { - let tx = tx.clone(); - Thread::spawn(move|| { - for _ in range(0, AMT) { tx.send(1); } - }).detach(); - } - drop(tx); - drx.recv(); - } - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = sync_channel::<int>(0); - drop(rx); - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = sync_channel::<int>(0); - drop(tx); - } - - #[test] - #[should_fail] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = sync_channel::<Box<int>>(0); - drop(rx); - tx.send(box 0); - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will panic - let res = Thread::spawn(move|| { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - rx.recv(); - }).join(); - // What is our res? - assert!(res.is_err()); - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = sync_channel::<Box<int>>(1); - tx.send(box 10); - assert!(rx.recv() == box 10); - } - - #[test] - fn oneshot_single_thread_try_send_open() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv() == 10); - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = sync_channel::<int>(0); - drop(rx); - assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); - } - - #[test] - fn oneshot_single_thread_try_send_closed2() { - let (tx, _rx) = sync_channel::<int>(0); - assert_eq!(tx.try_send(10), Err(Full(10))); - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = sync_channel::<int>(1); - tx.send(10); - assert!(rx.recv_opt() == Ok(10)); - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - assert!(rx.recv_opt() == Err(())); - } - - #[test] - fn oneshot_single_thread_peek_data() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(rx.try_recv(), Err(Empty)); - tx.send(10); - assert_eq!(rx.try_recv(), Ok(10)); - } - - #[test] - fn oneshot_single_thread_peek_close() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - assert_eq!(rx.try_recv(), Err(Disconnected)); - assert_eq!(rx.try_recv(), Err(Disconnected)); - } - - #[test] - fn oneshot_single_thread_peek_open() { - let (_tx, rx) = sync_channel::<int>(0); - assert_eq!(rx.try_recv(), Err(Empty)); - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { - assert!(rx.recv() == box 10); - }); - - tx.send(box 10); - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { - drop(tx); - }); - let res = Thread::spawn(move|| { - assert!(rx.recv() == box 10); - }).join(); - assert!(res.is_err()); - } - - #[test] - fn oneshot_multi_thread_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - drop(rx); - }); - drop(tx); - } - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - drop(rx); - }); - let _ = Thread::spawn(move || { - tx.send(1); - }).join(); - } - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - let res = Thread::spawn(move|| { - rx.recv(); - }).join(); - assert!(res.is_err()); - }); - let _t = Thread::spawn(move|| { - Thread::spawn(move|| { - drop(tx); - }).detach(); - }); - } - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { - tx.send(box 10i); - }); - assert!(rx.recv() == box 10i); - } - } - - #[test] - fn stream_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<Box<int>>(0); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: SyncSender<Box<int>>, i: int) { - if i == 10 { return } - - Thread::spawn(move|| { - tx.send(box i); - send(tx, i + 1); - }).detach(); - } - - fn recv(rx: Receiver<Box<int>>, i: int) { - if i == 10 { return } - - Thread::spawn(move|| { - assert!(rx.recv() == box i); - recv(rx, i + 1); - }).detach(); - } - } - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(10000); - for _ in range(0u, 10000) { tx.send(()); } - for _ in range(0u, 10000) { rx.recv(); } - } - - #[test] - fn shared_chan_stress() { - let (tx, rx) = sync_channel(0); - let total = stress_factor() + 100; - for _ in range(0, total) { - let tx = tx.clone(); - Thread::spawn(move|| { - tx.send(()); - }).detach(); - } - - for _ in range(0, total) { - rx.recv(); - } - } - - #[test] - fn test_nested_recv_iter() { - let (tx, rx) = sync_channel::<int>(0); - let (total_tx, total_rx) = sync_channel::<int>(0); - - let _t = Thread::spawn(move|| { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc); - }); - - tx.send(3); - tx.send(1); - tx.send(2); - drop(tx); - assert_eq!(total_rx.recv(), 6); - } - - #[test] - fn test_recv_iter_break() { - let (tx, rx) = sync_channel::<int>(0); - let (count_tx, count_rx) = sync_channel(0); - - let _t = Thread::spawn(move|| { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count); - }); - - tx.send(2); - tx.send(2); - tx.send(2); - let _ = tx.try_send(2); - drop(tx); - assert_eq!(count_rx.recv(), 4); - } - - #[test] - fn try_recv_states() { - let (tx1, rx1) = sync_channel::<int>(1); - let (tx2, rx2) = sync_channel::<()>(1); - let (tx3, rx3) = sync_channel::<()>(1); - let _t = Thread::spawn(move|| { - rx2.recv(); - tx1.send(1); - tx3.send(()); - rx2.recv(); - drop(tx1); - tx3.send(()); - }); - - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Err(Disconnected)); - } - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = sync_channel::<()>(0); - let (tx2, rx2) = sync_channel::<()>(0); - let _t = Thread::spawn(move|| { - rx.recv(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()); - }); - // make sure the other task has gone to sleep - for _ in range(0u, 5000) { Thread::yield_now(); } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()); - - // wait for the child task to exit before we exit - rx2.recv(); - } - - #[test] - fn send_opt1() { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { rx.recv(); }); - assert_eq!(tx.send_opt(1), Ok(())); - } - - #[test] - fn send_opt2() { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { drop(rx); }); - assert_eq!(tx.send_opt(1), Err(1)); - } - - #[test] - fn send_opt3() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(tx.send_opt(1), Ok(())); - let _t = Thread::spawn(move|| { drop(rx); }); - assert_eq!(tx.send_opt(1), Err(1)); - } - - #[test] - fn send_opt4() { - let (tx, rx) = sync_channel::<int>(0); - let tx2 = tx.clone(); - let (done, donerx) = channel(); - let done2 = done.clone(); - let _t = Thread::spawn(move|| { - assert_eq!(tx.send_opt(1), Err(1)); - done.send(()); - }); - let _t = Thread::spawn(move|| { - assert_eq!(tx2.send_opt(2), Err(2)); - done2.send(()); - }); - drop(rx); - donerx.recv(); - donerx.recv(); - } - - #[test] - fn try_send1() { - let (tx, _rx) = sync_channel::<int>(0); - assert_eq!(tx.try_send(1), Err(Full(1))); - } - - #[test] - fn try_send2() { - let (tx, _rx) = sync_channel::<int>(1); - assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(Full(1))); - } - - #[test] - fn try_send3() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(tx.try_send(1), Ok(())); - drop(rx); - assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); - } - - #[test] - fn issue_15761() { - fn repro() { - let (tx1, rx1) = sync_channel::<()>(3); - let (tx2, rx2) = sync_channel::<()>(3); - - let _t = Thread::spawn(move|| { - rx1.recv(); - tx2.try_send(()).unwrap(); - }); - - tx1.try_send(()).unwrap(); - rx2.recv(); - } - - for _ in range(0u, 100) { - repro() - } - } -} diff --git a/src/libstd/comm/mpsc_queue.rs b/src/libstd/comm/mpsc_queue.rs deleted file mode 100644 index d1b6d0d697c..00000000000 --- a/src/libstd/comm/mpsc_queue.rs +++ /dev/null @@ -1,205 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between tasks, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue may not be appropriate for all use-cases. - -#![experimental] - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -pub use self::PopResult::*; - -use core::prelude::*; - -use alloc::boxed::Box; -use core::mem; -use core::cell::UnsafeCell; - -use sync::atomic::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; - -/// A result of the `pop` function. -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue<T> { - head: AtomicPtr<Node<T>>, - tail: UnsafeCell<*mut Node<T>>, -} - -unsafe impl<T:Send> Send for Queue<T> { } -unsafe impl<T:Send> Sync for Queue<T> { } - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - mem::transmute(box Node { - next: AtomicPtr::new(0 as *mut Node<T>), - value: v, - }) - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue<T> { - let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option<T>`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&self) -> PopResult<T> { - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take().unwrap(); - let _: Box<Node<T>> = mem::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::v1::*; - - use comm::channel; - use super::{Queue, Data, Empty, Inconsistent}; - use sync::Arc; - use thread::Thread; - - #[test] - fn test_full() { - let q = Queue::new(); - q.push(box 1i); - q.push(box 2i); - } - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let q = Queue::new(); - match q.pop() { - Empty => {} - Inconsistent | Data(..) => panic!() - } - let (tx, rx) = channel(); - let q = Arc::new(q); - - for _ in range(0, nthreads) { - let tx = tx.clone(); - let q = q.clone(); - Thread::spawn(move|| { - for i in range(0, nmsgs) { - q.push(i); - } - tx.send(()); - }).detach(); - } - - let mut i = 0u; - while i < nthreads * nmsgs { - match q.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - drop(tx); - for _ in range(0, nthreads) { - rx.recv(); - } - } -} diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs deleted file mode 100644 index 9c5a6518845..00000000000 --- a/src/libstd/comm/oneshot.rs +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Oneshot channels/ports -/// -/// This is the initial flavor of channels/ports used for comm module. This is -/// an optimization for the one-use case of a channel. The major optimization of -/// this type is to have one and exactly one allocation when the chan/port pair -/// is created. -/// -/// Another possible optimization would be to not use an Arc box because -/// in theory we know when the shared packet can be deallocated (no real need -/// for the atomic reference counting), but I was having trouble how to destroy -/// the data early in a drop of a Port. -/// -/// # Implementation -/// -/// Oneshots are implemented around one atomic uint variable. This variable -/// indicates both the state of the port/chan but also contains any tasks -/// blocked on the port. All atomic operations happen on this one word. -/// -/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect -/// on behalf of the channel side of things (it can be mentally thought of as -/// consuming the port). This upgrade is then also stored in the shared packet. -/// The one caveat to consider is that when a port sees a disconnected channel -/// it must check for data because there is no "data plus upgrade" state. - -pub use self::Failure::*; -pub use self::UpgradeResult::*; -pub use self::SelectionResult::*; -use self::MyUpgrade::*; - -use core::prelude::*; - -use comm::Receiver; -use comm::blocking::{mod, SignalToken}; -use core::mem; -use sync::atomic; - -// Various states you can find a port in. -const EMPTY: uint = 0; // initial state: no data, no blocked reciever -const DATA: uint = 1; // data ready for receiver to take -const DISCONNECTED: uint = 2; // channel is disconnected OR upgraded -// Any other value represents a pointer to a SignalToken value. The -// protocol ensures that when the state moves *to* a pointer, -// ownership of the token is given to the packet, and when the state -// moves *from* a pointer, ownership of the token is transferred to -// whoever changed the state. - -pub struct Packet<T> { - // Internal state of the chan/port pair (stores the blocked task as well) - state: atomic::AtomicUint, - // One-shot data slot location - data: Option<T>, - // when used for the second time, a oneshot channel must be upgraded, and - // this contains the slot for the upgrade - upgrade: MyUpgrade<T>, -} - -pub enum Failure<T> { - Empty, - Disconnected, - Upgraded(Receiver<T>), -} - -pub enum UpgradeResult { - UpSuccess, - UpDisconnected, - UpWoke(SignalToken), -} - -pub enum SelectionResult<T> { - SelCanceled, - SelUpgraded(SignalToken, Receiver<T>), - SelSuccess, -} - -enum MyUpgrade<T> { - NothingSent, - SendUsed, - GoUp(Receiver<T>), -} - -impl<T: Send> Packet<T> { - pub fn new() -> Packet<T> { - Packet { - data: None, - upgrade: NothingSent, - state: atomic::AtomicUint::new(EMPTY), - } - } - - pub fn send(&mut self, t: T) -> Result<(), T> { - // Sanity check - match self.upgrade { - NothingSent => {} - _ => panic!("sending on a oneshot that's already sent on "), - } - assert!(self.data.is_none()); - self.data = Some(t); - self.upgrade = SendUsed; - - match self.state.swap(DATA, atomic::SeqCst) { - // Sent the data, no one was waiting - EMPTY => Ok(()), - - // Couldn't send the data, the port hung up first. Return the data - // back up the stack. - DISCONNECTED => { - Err(self.data.take().unwrap()) - } - - // Not possible, these are one-use channels - DATA => unreachable!(), - - // There is a thread waiting on the other end. We leave the 'DATA' - // state inside so it'll pick it up on the other end. - ptr => unsafe { - SignalToken::cast_from_uint(ptr).signal(); - Ok(()) - } - } - } - - // Just tests whether this channel has been sent on or not, this is only - // safe to use from the sender. - pub fn sent(&self) -> bool { - match self.upgrade { - NothingSent => false, - _ => true, - } - } - - pub fn recv(&mut self) -> Result<T, Failure<T>> { - // Attempt to not block the task (it's a little expensive). If it looks - // like we're not empty, then immediately go through to `try_recv`. - if self.state.load(atomic::SeqCst) == EMPTY { - let (wait_token, signal_token) = blocking::tokens(); - let ptr = unsafe { signal_token.cast_to_uint() }; - - // race with senders to enter the blocking state - if self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) == EMPTY { - wait_token.wait(); - debug_assert!(self.state.load(atomic::SeqCst) != EMPTY); - } else { - // drop the signal token, since we never blocked - drop(unsafe { SignalToken::cast_from_uint(ptr) }); - } - } - - self.try_recv() - } - - pub fn try_recv(&mut self) -> Result<T, Failure<T>> { - match self.state.load(atomic::SeqCst) { - EMPTY => Err(Empty), - - // We saw some data on the channel, but the channel can be used - // again to send us an upgrade. As a result, we need to re-insert - // into the channel that there's no data available (otherwise we'll - // just see DATA next time). This is done as a cmpxchg because if - // the state changes under our feet we'd rather just see that state - // change. - DATA => { - self.state.compare_and_swap(DATA, EMPTY, atomic::SeqCst); - match self.data.take() { - Some(data) => Ok(data), - None => unreachable!(), - } - } - - // There's no guarantee that we receive before an upgrade happens, - // and an upgrade flags the channel as disconnected, so when we see - // this we first need to check if there's data available and *then* - // we go through and process the upgrade. - DISCONNECTED => { - match self.data.take() { - Some(data) => Ok(data), - None => { - match mem::replace(&mut self.upgrade, SendUsed) { - SendUsed | NothingSent => Err(Disconnected), - GoUp(upgrade) => Err(Upgraded(upgrade)) - } - } - } - } - - // We are the sole receiver; there cannot be a blocking - // receiver already. - _ => unreachable!() - } - } - - // Returns whether the upgrade was completed. If the upgrade wasn't - // completed, then the port couldn't get sent to the other half (it will - // never receive it). - pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { - let prev = match self.upgrade { - NothingSent => NothingSent, - SendUsed => SendUsed, - _ => panic!("upgrading again"), - }; - self.upgrade = GoUp(up); - - match self.state.swap(DISCONNECTED, atomic::SeqCst) { - // If the channel is empty or has data on it, then we're good to go. - // Senders will check the data before the upgrade (in case we - // plastered over the DATA state). - DATA | EMPTY => UpSuccess, - - // If the other end is already disconnected, then we failed the - // upgrade. Be sure to trash the port we were given. - DISCONNECTED => { self.upgrade = prev; UpDisconnected } - - // If someone's waiting, we gotta wake them up - ptr => UpWoke(unsafe { SignalToken::cast_from_uint(ptr) }) - } - } - - pub fn drop_chan(&mut self) { - match self.state.swap(DISCONNECTED, atomic::SeqCst) { - DATA | DISCONNECTED | EMPTY => {} - - // If someone's waiting, we gotta wake them up - ptr => unsafe { - SignalToken::cast_from_uint(ptr).signal(); - } - } - } - - pub fn drop_port(&mut self) { - match self.state.swap(DISCONNECTED, atomic::SeqCst) { - // An empty channel has nothing to do, and a remotely disconnected - // channel also has nothing to do b/c we're about to run the drop - // glue - DISCONNECTED | EMPTY => {} - - // There's data on the channel, so make sure we destroy it promptly. - // This is why not using an arc is a little difficult (need the box - // to stay valid while we take the data). - DATA => { self.data.take().unwrap(); } - - // We're the only ones that can block on this port - _ => unreachable!() - } - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { - match self.state.load(atomic::SeqCst) { - EMPTY => Ok(false), // Welp, we tried - DATA => Ok(true), // we have some un-acquired data - DISCONNECTED if self.data.is_some() => Ok(true), // we have data - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => Err(upgrade), - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { self.upgrade = up; Ok(true) } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - - // Attempts to start selection on this port. This can either succeed, fail - // because there is data, or fail because there is an upgrade pending. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> { - let ptr = unsafe { token.cast_to_uint() }; - match self.state.compare_and_swap(EMPTY, ptr, atomic::SeqCst) { - EMPTY => SelSuccess, - DATA => { - drop(unsafe { SignalToken::cast_from_uint(ptr) }); - SelCanceled - } - DISCONNECTED if self.data.is_some() => { - drop(unsafe { SignalToken::cast_from_uint(ptr) }); - SelCanceled - } - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => { - SelUpgraded(unsafe { SignalToken::cast_from_uint(ptr) }, upgrade) - } - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { - self.upgrade = up; - drop(unsafe { SignalToken::cast_from_uint(ptr) }); - SelCanceled - } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - - // Remove a previous selecting task from this port. This ensures that the - // blocked task will no longer be visible to any other threads. - // - // The return value indicates whether there's data on this port. - pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { - let state = match self.state.load(atomic::SeqCst) { - // Each of these states means that no further activity will happen - // with regard to abortion selection - s @ EMPTY | - s @ DATA | - s @ DISCONNECTED => s, - - // If we've got a blocked task, then use an atomic to gain ownership - // of it (may fail) - ptr => self.state.compare_and_swap(ptr, EMPTY, atomic::SeqCst) - }; - - // Now that we've got ownership of our state, figure out what to do - // about it. - match state { - EMPTY => unreachable!(), - // our task used for select was stolen - DATA => Ok(true), - - // If the other end has hung up, then we have complete ownership - // of the port. First, check if there was data waiting for us. This - // is possible if the other end sent something and then hung up. - // - // We then need to check to see if there was an upgrade requested, - // and if so, the upgraded port needs to have its selection aborted. - DISCONNECTED => { - if self.data.is_some() { - Ok(true) - } else { - match mem::replace(&mut self.upgrade, SendUsed) { - GoUp(port) => Err(port), - _ => Ok(true), - } - } - } - - // We woke ourselves up from select. - ptr => unsafe { - drop(SignalToken::cast_from_uint(ptr)); - Ok(false) - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - assert_eq!(self.state.load(atomic::SeqCst), DISCONNECTED); - } -} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs deleted file mode 100644 index 5c476775bdb..00000000000 --- a/src/libstd/comm/select.rs +++ /dev/null @@ -1,749 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Selection over an array of receivers -//! -//! This module contains the implementation machinery necessary for selecting -//! over a number of receivers. One large goal of this module is to provide an -//! efficient interface to selecting over any receiver of any type. -//! -//! This is achieved through an architecture of a "receiver set" in which -//! receivers are added to a set and then the entire set is waited on at once. -//! The set can be waited on multiple times to prevent re-adding each receiver -//! to the set. -//! -//! Usage of this module is currently encouraged to go through the use of the -//! `select!` macro. This macro allows naturally binding of variables to the -//! received values of receivers in a much more natural syntax then usage of the -//! `Select` structure directly. -//! -//! # Example -//! -//! ```rust -//! use std::comm::channel; -//! -//! let (tx1, rx1) = channel(); -//! let (tx2, rx2) = channel(); -//! -//! tx1.send(1i); -//! tx2.send(2i); -//! -//! select! { -//! val = rx1.recv() => { -//! assert_eq!(val, 1i); -//! }, -//! val = rx2.recv() => { -//! assert_eq!(val, 2i); -//! } -//! } -//! ``` - -#![allow(dead_code)] -#![experimental = "This implementation, while likely sufficient, is unsafe and \ - likely to be error prone. At some point in the future this \ - module will likely be replaced, and it is currently \ - unknown how much API breakage that will cause. The ability \ - to select over a number of channels will remain forever, \ - but no guarantees beyond this are being made"] - - -use core::prelude::*; - -use core::cell::Cell; -use core::kinds::marker; -use core::mem; -use core::uint; - -use comm::Receiver; -use comm::blocking::{mod, SignalToken}; - -/// The "receiver set" of the select interface. This structure is used to manage -/// a set of receivers which are being selected over. -pub struct Select { - head: *mut Handle<'static, ()>, - tail: *mut Handle<'static, ()>, - next_id: Cell<uint>, - marker1: marker::NoSend, -} - -/// A handle to a receiver which is currently a member of a `Select` set of -/// receivers. This handle is used to keep the receiver in the set as well as -/// interact with the underlying receiver. -pub struct Handle<'rx, T:'rx> { - /// The ID of this handle, used to compare against the return value of - /// `Select::wait()` - id: uint, - selector: &'rx Select, - next: *mut Handle<'static, ()>, - prev: *mut Handle<'static, ()>, - added: bool, - packet: &'rx (Packet+'rx), - - // due to our fun transmutes, we be sure to place this at the end. (nothing - // previous relies on T) - rx: &'rx Receiver<T>, -} - -struct Packets { cur: *mut Handle<'static, ()> } - -#[doc(hidden)] -#[deriving(PartialEq)] -pub enum StartResult { - Installed, - Abort, -} - -#[doc(hidden)] -pub trait Packet { - fn can_recv(&self) -> bool; - fn start_selection(&self, token: SignalToken) -> StartResult; - fn abort_selection(&self) -> bool; -} - -impl Select { - /// Creates a new selection structure. This set is initially empty and - /// `wait` will panic!() if called. - /// - /// Usage of this struct directly can sometimes be burdensome, and usage is - /// rather much easier through the `select!` macro. - pub fn new() -> Select { - Select { - marker1: marker::NoSend, - head: 0 as *mut Handle<'static, ()>, - tail: 0 as *mut Handle<'static, ()>, - next_id: Cell::new(1), - } - } - - /// Creates a new handle into this receiver set for a new receiver. Note - /// that this does *not* add the receiver to the receiver set, for that you - /// must call the `add` method on the handle itself. - pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> { - let id = self.next_id.get(); - self.next_id.set(id + 1); - Handle { - id: id, - selector: self, - next: 0 as *mut Handle<'static, ()>, - prev: 0 as *mut Handle<'static, ()>, - added: false, - rx: rx, - packet: rx, - } - } - - /// Waits for an event on this receiver set. The returned value is *not* an - /// index, but rather an id. This id can be queried against any active - /// `Handle` structures (each one has an `id` method). The handle with - /// the matching `id` will have some sort of event available on it. The - /// event could either be that data is available or the corresponding - /// channel has been closed. - pub fn wait(&self) -> uint { - self.wait2(true) - } - - /// Helper method for skipping the preflight checks during testing - fn wait2(&self, do_preflight_checks: bool) -> uint { - // Note that this is currently an inefficient implementation. We in - // theory have knowledge about all receivers in the set ahead of time, - // so this method shouldn't really have to iterate over all of them yet - // again. The idea with this "receiver set" interface is to get the - // interface right this time around, and later this implementation can - // be optimized. - // - // This implementation can be summarized by: - // - // fn select(receivers) { - // if any receiver ready { return ready index } - // deschedule { - // block on all receivers - // } - // unblock on all receivers - // return ready index - // } - // - // Most notably, the iterations over all of the receivers shouldn't be - // necessary. - unsafe { - // Stage 1: preflight checks. Look for any packets ready to receive - if do_preflight_checks { - for handle in self.iter() { - if (*handle).packet.can_recv() { - return (*handle).id(); - } - } - } - - // Stage 2: begin the blocking process - // - // Create a number of signal tokens, and install each one - // sequentially until one fails. If one fails, then abort the - // selection on the already-installed tokens. - let (wait_token, signal_token) = blocking::tokens(); - for (i, handle) in self.iter().enumerate() { - match (*handle).packet.start_selection(signal_token.clone()) { - StartResult::Installed => {} - StartResult::Abort => { - // Go back and abort the already-begun selections - for handle in self.iter().take(i) { - (*handle).packet.abort_selection(); - } - return (*handle).id; - } - } - } - - // Stage 3: no messages available, actually block - wait_token.wait(); - - // Stage 4: there *must* be message available; find it. - // - // Abort the selection process on each receiver. If the abort - // process returns `true`, then that means that the receiver is - // ready to receive some data. Note that this also means that the - // receiver may have yet to have fully read the `to_wake` field and - // woken us up (although the wakeup is guaranteed to fail). - // - // This situation happens in the window of where a sender invokes - // increment(), sees -1, and then decides to wake up the task. After - // all this is done, the sending thread will set `selecting` to - // `false`. Until this is done, we cannot return. If we were to - // return, then a sender could wake up a receiver which has gone - // back to sleep after this call to `select`. - // - // Note that it is a "fairly small window" in which an increment() - // views that it should wake a thread up until the `selecting` bit - // is set to false. For now, the implementation currently just spins - // in a yield loop. This is very distasteful, but this - // implementation is already nowhere near what it should ideally be. - // A rewrite should focus on avoiding a yield loop, and for now this - // implementation is tying us over to a more efficient "don't - // iterate over everything every time" implementation. - let mut ready_id = uint::MAX; - for handle in self.iter() { - if (*handle).packet.abort_selection() { - ready_id = (*handle).id; - } - } - - // We must have found a ready receiver - assert!(ready_id != uint::MAX); - return ready_id; - } - } - - fn iter(&self) -> Packets { Packets { cur: self.head } } -} - -impl<'rx, T: Send> Handle<'rx, T> { - /// Retrieve the id of this handle. - #[inline] - pub fn id(&self) -> uint { self.id } - - /// Receive a value on the underlying receiver. Has the same semantics as - /// `Receiver.recv` - pub fn recv(&mut self) -> T { self.rx.recv() } - /// Block to receive a value on the underlying receiver, returning `Some` on - /// success or `None` if the channel disconnects. This function has the same - /// semantics as `Receiver.recv_opt` - pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() } - - /// Adds this handle to the receiver set that the handle was created from. This - /// method can be called multiple times, but it has no effect if `add` was - /// called previously. - /// - /// This method is unsafe because it requires that the `Handle` is not moved - /// while it is added to the `Select` set. - pub unsafe fn add(&mut self) { - if self.added { return } - let selector: &mut Select = mem::transmute(&*self.selector); - let me: *mut Handle<'static, ()> = mem::transmute(&*self); - - if selector.head.is_null() { - selector.head = me; - selector.tail = me; - } else { - (*me).prev = selector.tail; - assert!((*me).next.is_null()); - (*selector.tail).next = me; - selector.tail = me; - } - self.added = true; - } - - /// Removes this handle from the `Select` set. This method is unsafe because - /// it has no guarantee that the `Handle` was not moved since `add` was - /// called. - pub unsafe fn remove(&mut self) { - if !self.added { return } - - let selector: &mut Select = mem::transmute(&*self.selector); - let me: *mut Handle<'static, ()> = mem::transmute(&*self); - - if self.prev.is_null() { - assert_eq!(selector.head, me); - selector.head = self.next; - } else { - (*self.prev).next = self.next; - } - if self.next.is_null() { - assert_eq!(selector.tail, me); - selector.tail = self.prev; - } else { - (*self.next).prev = self.prev; - } - - self.next = 0 as *mut Handle<'static, ()>; - self.prev = 0 as *mut Handle<'static, ()>; - - self.added = false; - } -} - -#[unsafe_destructor] -impl Drop for Select { - fn drop(&mut self) { - assert!(self.head.is_null()); - assert!(self.tail.is_null()); - } -} - -#[unsafe_destructor] -impl<'rx, T: Send> Drop for Handle<'rx, T> { - fn drop(&mut self) { - unsafe { self.remove() } - } -} - -impl Iterator<*mut Handle<'static, ()>> for Packets { - fn next(&mut self) -> Option<*mut Handle<'static, ()>> { - if self.cur.is_null() { - None - } else { - let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).next; } - ret - } - } -} - -#[cfg(test)] -#[allow(unused_imports)] -mod test { - use prelude::v1::*; - - use super::*; - use comm::*; - use thread::Thread; - - // Don't use the libstd version so we can pull in the right Select structure - // (std::comm points at the wrong one) - macro_rules! select { - ( - $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ - ) => ({ - use comm::Select; - let sel = Select::new(); - $( let mut $rx = sel.handle(&$rx); )+ - unsafe { - $( $rx.add(); )+ - } - let ret = sel.wait(); - $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ - { unreachable!() } - }) - } - - #[test] - fn smoke() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - tx1.send(1); - select! { - foo = rx1.recv() => { assert_eq!(foo, 1); }, - _bar = rx2.recv() => { panic!() } - } - tx2.send(2); - select! { - _foo = rx1.recv() => { panic!() }, - bar = rx2.recv() => { assert_eq!(bar, 2) } - } - drop(tx1); - select! { - foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, - _bar = rx2.recv() => { panic!() } - } - drop(tx2); - select! { - bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } - } - } - - #[test] - fn smoke2() { - let (_tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (_tx3, rx3) = channel::<int>(); - let (_tx4, rx4) = channel::<int>(); - let (tx5, rx5) = channel::<int>(); - tx5.send(4); - select! { - _foo = rx1.recv() => { panic!("1") }, - _foo = rx2.recv() => { panic!("2") }, - _foo = rx3.recv() => { panic!("3") }, - _foo = rx4.recv() => { panic!("4") }, - foo = rx5.recv() => { assert_eq!(foo, 4); } - } - } - - #[test] - fn closed() { - let (_tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - drop(tx2); - - select! { - _a1 = rx1.recv_opt() => { panic!() }, - a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } - } - } - - #[test] - fn unblocks() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<int>(); - - let _t = Thread::spawn(move|| { - for _ in range(0u, 20) { Thread::yield_now(); } - tx1.send(1); - rx3.recv(); - for _ in range(0u, 20) { Thread::yield_now(); } - }); - - select! { - a = rx1.recv() => { assert_eq!(a, 1); }, - _b = rx2.recv() => { panic!() } - } - tx3.send(1); - select! { - a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, - _b = rx2.recv() => { panic!() } - } - } - - #[test] - fn both_ready() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - let _t = Thread::spawn(move|| { - for _ in range(0u, 20) { Thread::yield_now(); } - tx1.send(1); - tx2.send(2); - rx3.recv(); - }); - - select! { - a = rx1.recv() => { assert_eq!(a, 1); }, - a = rx2.recv() => { assert_eq!(a, 2); } - } - select! { - a = rx1.recv() => { assert_eq!(a, 1); }, - a = rx2.recv() => { assert_eq!(a, 2); } - } - assert_eq!(rx1.try_recv(), Err(Empty)); - assert_eq!(rx2.try_recv(), Err(Empty)); - tx3.send(()); - } - - #[test] - fn stress() { - static AMT: int = 10000; - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - let _t = Thread::spawn(move|| { - for i in range(0, AMT) { - if i % 2 == 0 { - tx1.send(i); - } else { - tx2.send(i); - } - rx3.recv(); - } - }); - - for i in range(0, AMT) { - select! { - i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); }, - i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); } - } - tx3.send(()); - } - } - - #[test] - fn cloning() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - let _t = Thread::spawn(move|| { - rx3.recv(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(Empty)); - tx1.send(2); - rx3.recv(); - }); - - tx3.send(()); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()); - } - - #[test] - fn cloning2() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - let _t = Thread::spawn(move|| { - rx3.recv(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(Empty)); - tx1.send(2); - rx3.recv(); - }); - - tx3.send(()); - select! { - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => panic!() - } - tx3.send(()); - } - - #[test] - fn cloning3() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - let _t = Thread::spawn(move|| { - let s = Select::new(); - let mut h1 = s.handle(&rx1); - let mut h2 = s.handle(&rx2); - unsafe { h2.add(); } - unsafe { h1.add(); } - assert_eq!(s.wait(), h2.id); - tx3.send(()); - }); - - for _ in range(0u, 1000) { Thread::yield_now(); } - drop(tx1.clone()); - tx2.send(()); - rx3.recv(); - } - - #[test] - fn preflight1() { - let (tx, rx) = channel(); - tx.send(()); - select! { - () = rx.recv() => {} - } - } - - #[test] - fn preflight2() { - let (tx, rx) = channel(); - tx.send(()); - tx.send(()); - select! { - () = rx.recv() => {} - } - } - - #[test] - fn preflight3() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - select! { - () = rx.recv() => {} - } - } - - #[test] - fn preflight4() { - let (tx, rx) = channel(); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn preflight5() { - let (tx, rx) = channel(); - tx.send(()); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn preflight6() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn preflight7() { - let (tx, rx) = channel::<()>(); - drop(tx); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn preflight8() { - let (tx, rx) = channel(); - tx.send(()); - drop(tx); - rx.recv(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn preflight9() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - drop(tx); - rx.recv(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - } - - #[test] - fn oneshot_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - let _t = Thread::spawn(move|| { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0u, 100) { Thread::yield_now() } - tx1.send(()); - rx2.recv(); - } - - #[test] - fn stream_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - tx1.send(()); - tx1.send(()); - rx1.recv(); - rx1.recv(); - let _t = Thread::spawn(move|| { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0u, 100) { Thread::yield_now() } - tx1.send(()); - rx2.recv(); - } - - #[test] - fn shared_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - drop(tx1.clone()); - tx1.send(()); - rx1.recv(); - let _t = Thread::spawn(move|| { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0u, 100) { Thread::yield_now() } - tx1.send(()); - rx2.recv(); - } - - #[test] - fn sync1() { - let (tx, rx) = sync_channel::<int>(1); - tx.send(1); - select! { - n = rx.recv() => { assert_eq!(n, 1); } - } - } - - #[test] - fn sync2() { - let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - for _ in range(0u, 100) { Thread::yield_now() } - tx.send(1); - }); - select! { - n = rx.recv() => { assert_eq!(n, 1); } - } - } - - #[test] - fn sync3() { - let (tx1, rx1) = sync_channel::<int>(0); - let (tx2, rx2): (Sender<int>, Receiver<int>) = channel(); - let _t = Thread::spawn(move|| { tx1.send(1); }); - let _t = Thread::spawn(move|| { tx2.send(2); }); - select! { - n = rx1.recv() => { - assert_eq!(n, 1); - assert_eq!(rx2.recv(), 2); - }, - n = rx2.recv() => { - assert_eq!(n, 2); - assert_eq!(rx1.recv(), 1); - } - } - } -} diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs deleted file mode 100644 index 1022694e634..00000000000 --- a/src/libstd/comm/shared.rs +++ /dev/null @@ -1,486 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Shared channels -/// -/// This is the flavor of channels which are not necessarily optimized for any -/// particular use case, but are the most general in how they are used. Shared -/// channels are cloneable allowing for multiple senders. -/// -/// High level implementation details can be found in the comment of the parent -/// module. You'll also note that the implementation of the shared and stream -/// channels are quite similar, and this is no coincidence! - -pub use self::Failure::*; - -use core::prelude::*; - -use core::cmp; -use core::int; - -use sync::{atomic, Mutex, MutexGuard}; -use comm::mpsc_queue as mpsc; -use comm::blocking::{mod, SignalToken}; -use comm::select::StartResult; -use comm::select::StartResult::*; -use thread::Thread; - -const DISCONNECTED: int = int::MIN; -const FUDGE: int = 1024; -#[cfg(test)] -const MAX_STEALS: int = 5; -#[cfg(not(test))] -const MAX_STEALS: int = 1 << 20; - -pub struct Packet<T> { - queue: mpsc::Queue<T>, - cnt: atomic::AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: atomic::AtomicUint, // SignalToken for wake up - - // The number of channels which are currently using this packet. - channels: atomic::AtomicInt, - - // See the discussion in Port::drop and the channel send methods for what - // these are used for - port_dropped: atomic::AtomicBool, - sender_drain: atomic::AtomicInt, - - // this lock protects various portions of this implementation during - // select() - select_lock: Mutex<()>, -} - -pub enum Failure { - Empty, - Disconnected, -} - -impl<T: Send> Packet<T> { - // Creation of a packet *must* be followed by a call to postinit_lock - // and later by inherit_blocker - pub fn new() -> Packet<T> { - let p = Packet { - queue: mpsc::Queue::new(), - cnt: atomic::AtomicInt::new(0), - steals: 0, - to_wake: atomic::AtomicUint::new(0), - channels: atomic::AtomicInt::new(2), - port_dropped: atomic::AtomicBool::new(false), - sender_drain: atomic::AtomicInt::new(0), - select_lock: Mutex::new(()), - }; - return p; - } - - // This function should be used after newly created Packet - // was wrapped with an Arc - // In other case mutex data will be duplicated while cloning - // and that could cause problems on platforms where it is - // represented by opaque data structure - pub fn postinit_lock(&self) -> MutexGuard<()> { - self.select_lock.lock() - } - - // This function is used at the creation of a shared packet to inherit a - // previously blocked task. This is done to prevent spurious wakeups of - // tasks in select(). - // - // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, - token: Option<SignalToken>, - guard: MutexGuard<()>) { - token.map(|token| { - assert_eq!(self.cnt.load(atomic::SeqCst), 0); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst); - self.cnt.store(-1, atomic::SeqCst); - - // This store is a little sketchy. What's happening here is that - // we're transferring a blocker from a oneshot or stream channel to - // this shared channel. In doing so, we never spuriously wake them - // up and rather only wake them up at the appropriate time. This - // implementation of shared channels assumes that any blocking - // recv() will undo the increment of steals performed in try_recv() - // once the recv is complete. This thread that we're inheriting, - // however, is not in the middle of recv. Hence, the first time we - // wake them up, they're going to wake up from their old port, move - // on to the upgraded port, and then call the block recv() function. - // - // When calling this function, they'll find there's data immediately - // available, counting it as a steal. This in fact wasn't a steal - // because we appropriately blocked them waiting for data. - // - // To offset this bad increment, we initially set the steal count to - // -1. You'll find some special code in abort_selection() as well to - // ensure that this -1 steal count doesn't escape too far. - self.steals = -1; - }); - - // When the shared packet is constructed, we grabbed this lock. The - // purpose of this lock is to ensure that abort_selection() doesn't - // interfere with this method. After we unlock this lock, we're - // signifying that we're done modifying self.cnt and self.to_wake and - // the port is ready for the world to continue using it. - drop(guard); - } - - pub fn send(&mut self, t: T) -> Result<(), T> { - // See Port::drop for what's going on - if self.port_dropped.load(atomic::SeqCst) { return Err(t) } - - // Note that the multiple sender case is a little trickier - // semantically than the single sender case. The logic for - // incrementing is "add and if disconnected store disconnected". - // This could end up leading some senders to believe that there - // wasn't a disconnect if in fact there was a disconnect. This means - // that while one thread is attempting to re-store the disconnected - // states, other threads could walk through merrily incrementing - // this very-negative disconnected count. To prevent senders from - // spuriously attempting to send when the channels is actually - // disconnected, the count has a ranged check here. - // - // This is also done for another reason. Remember that the return - // value of this function is: - // - // `true` == the data *may* be received, this essentially has no - // meaning - // `false` == the data will *never* be received, this has a lot of - // meaning - // - // In the SPSC case, we have a check of 'queue.is_empty()' to see - // whether the data was actually received, but this same condition - // means nothing in a multi-producer context. As a result, this - // preflight check serves as the definitive "this will never be - // received". Once we get beyond this check, we have permanently - // entered the realm of "this may be received" - if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE { - return Err(t) - } - - self.queue.push(t); - match self.cnt.fetch_add(1, atomic::SeqCst) { - -1 => { - self.take_to_wake().signal(); - } - - // In this case, we have possibly failed to send our data, and - // we need to consider re-popping the data in order to fully - // destroy it. We must arbitrate among the multiple senders, - // however, because the queues that we're using are - // single-consumer queues. In order to do this, all exiting - // pushers will use an atomic count in order to count those - // flowing through. Pushers who see 0 are required to drain as - // much as possible, and then can only exit when they are the - // only pusher (otherwise they must try again). - n if n < DISCONNECTED + FUDGE => { - // see the comment in 'try' for a shared channel for why this - // window of "not disconnected" is ok. - self.cnt.store(DISCONNECTED, atomic::SeqCst); - - if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 { - loop { - // drain the queue, for info on the thread yield see the - // discussion in try_recv - loop { - match self.queue.pop() { - mpsc::Data(..) => {} - mpsc::Empty => break, - mpsc::Inconsistent => Thread::yield_now(), - } - } - // maybe we're done, if we're not the last ones - // here, then we need to go try again. - if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 { - break - } - } - - // At this point, there may still be data on the queue, - // but only if the count hasn't been incremented and - // some other sender hasn't finished pushing data just - // yet. That sender in question will drain its own data. - } - } - - // Can't make any assumptions about this case like in the SPSC case. - _ => {} - } - - Ok(()) - } - - pub fn recv(&mut self) -> Result<T, Failure> { - // This code is essentially the exact same as that found in the stream - // case (see stream.rs) - match self.try_recv() { - Err(Empty) => {} - data => return data, - } - - let (wait_token, signal_token) = blocking::tokens(); - if self.decrement(signal_token) == Installed { - wait_token.wait() - } - - match self.try_recv() { - data @ Ok(..) => { self.steals -= 1; data } - data => data, - } - } - - // Essentially the exact same thing as the stream decrement function. - // Returns true if blocking should proceed. - fn decrement(&mut self, token: SignalToken) -> StartResult { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - let ptr = unsafe { token.cast_to_uint() }; - self.to_wake.store(ptr, atomic::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Installed } - } - } - - self.to_wake.store(0, atomic::SeqCst); - drop(unsafe { SignalToken::cast_from_uint(ptr) }); - Abort - } - - pub fn try_recv(&mut self) -> Result<T, Failure> { - let ret = match self.queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - - // This is a bit of an interesting case. The channel is reported as - // having data available, but our pop() has failed due to the queue - // being in an inconsistent state. This means that there is some - // pusher somewhere which has yet to complete, but we are guaranteed - // that a pop will eventually succeed. In this case, we spin in a - // yield loop because the remote sender should finish their enqueue - // operation "very quickly". - // - // Avoiding this yield loop would require a different queue - // abstraction which provides the guarantee that after M pushes have - // succeeded, at least M pops will succeed. The current queues - // guarantee that if there are N active pushes, you can pop N times - // once all N have finished. - mpsc::Inconsistent => { - let data; - loop { - Thread::yield_now(); - match self.queue.pop() { - mpsc::Data(t) => { data = t; break } - mpsc::Empty => panic!("inconsistent => empty"), - mpsc::Inconsistent => {} - } - } - Some(data) - } - }; - match ret { - // See the discussion in the stream implementation for why we - // might decrement steals. - Some(data) => { - if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomic::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); - } - n => { - let m = cmp::min(n, self.steals); - self.steals -= m; - self.bump(n - m); - } - } - assert!(self.steals >= 0); - } - self.steals += 1; - Ok(data) - } - - // See the discussion in the stream implementation for why we try - // again. - None => { - match self.cnt.load(atomic::SeqCst) { - n if n != DISCONNECTED => Err(Empty), - _ => { - match self.queue.pop() { - mpsc::Data(t) => Ok(t), - mpsc::Empty => Err(Disconnected), - // with no senders, an inconsistency is impossible. - mpsc::Inconsistent => unreachable!(), - } - } - } - } - } - } - - // Prepares this shared packet for a channel clone, essentially just bumping - // a refcount. - pub fn clone_chan(&mut self) { - self.channels.fetch_add(1, atomic::SeqCst); - } - - // Decrement the reference count on a channel. This is called whenever a - // Chan is dropped and may end up waking up a receiver. It's the receiver's - // responsibility on the other end to figure out that we've disconnected. - pub fn drop_chan(&mut self) { - match self.channels.fetch_sub(1, atomic::SeqCst) { - 1 => {} - n if n > 1 => return, - n => panic!("bad number of channels left {}", n), - } - - match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { - -1 => { self.take_to_wake().signal(); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - - // See the long discussion inside of stream.rs for why the queue is drained, - // and why it is done in this fashion. - pub fn drop_port(&mut self) { - self.port_dropped.store(true, atomic::SeqCst); - let mut steals = self.steals; - while { - let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst); - cnt != DISCONNECTED && cnt != steals - } { - // See the discussion in 'try_recv' for why we yield - // control of this thread. - loop { - match self.queue.pop() { - mpsc::Data(..) => { steals += 1; } - mpsc::Empty | mpsc::Inconsistent => break, - } - } - } - } - - // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { - let ptr = self.to_wake.load(atomic::SeqCst); - self.to_wake.store(0, atomic::SeqCst); - assert!(ptr != 0); - unsafe { SignalToken::cast_from_uint(ptr) } - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // Helper function for select, tests whether this port can receive without - // blocking (obviously not an atomic decision). - // - // This is different than the stream version because there's no need to peek - // at the queue, we can just look at the local count. - pub fn can_recv(&mut self) -> bool { - let cnt = self.cnt.load(atomic::SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 - } - - // increment the count on the channel (used for selection) - fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomic::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); - DISCONNECTED - } - n => n - } - } - - // Inserts the signal token for selection on this port, returning true if - // blocking should proceed. - // - // The code here is the same as in stream.rs, except that it doesn't need to - // peek at the channel to see if an upgrade is pending. - pub fn start_selection(&mut self, token: SignalToken) -> StartResult { - match self.decrement(token) { - Installed => Installed, - Abort => { - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - Abort - } - } - } - - // Cancels a previous task waiting on this port, returning whether there's - // data on the port. - // - // This is similar to the stream implementation (hence fewer comments), but - // uses a different value for the "steals" variable. - pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { - // Before we do anything else, we bounce on this lock. The reason for - // doing this is to ensure that any upgrade-in-progress is gone and - // done with. Without this bounce, we can race with inherit_blocker - // about looking at and dealing with to_wake. Once we have acquired the - // lock, we are guaranteed that inherit_blocker is done. - { - let _guard = self.select_lock.lock(); - } - - // Like the stream implementation, we want to make sure that the count - // on the channel goes non-negative. We don't know how negative the - // stream currently is, so instead of using a steal value of 1, we load - // the channel count and figure out what we should do to make it - // positive. - let steals = { - let cnt = self.cnt.load(atomic::SeqCst); - if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} - }; - let prev = self.bump(steals + 1); - - if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - true - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - if prev < 0 { - drop(self.take_to_wake()); - } else { - while self.to_wake.load(atomic::SeqCst) != 0 { - Thread::yield_now(); - } - } - // if the number of steals is -1, it was the pre-emptive -1 steal - // count from when we inherited a blocker. This is fine because - // we're just going to overwrite it with a real value. - assert!(self.steals == 0 || self.steals == -1); - self.steals = steals; - prev >= 0 - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - assert_eq!(self.channels.load(atomic::SeqCst), 0); - } -} diff --git a/src/libstd/comm/spsc_queue.rs b/src/libstd/comm/spsc_queue.rs deleted file mode 100644 index 1e2f5222d8b..00000000000 --- a/src/libstd/comm/spsc_queue.rs +++ /dev/null @@ -1,343 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue - -//! A single-producer single-consumer concurrent queue -//! -//! This module contains the implementation of an SPSC queue which can be used -//! concurrently between two tasks. This data structure is safe to use and -//! enforces the semantics that there is one pusher and one popper. - -#![experimental] - -use core::prelude::*; - -use alloc::boxed::Box; -use core::mem; -use core::cell::UnsafeCell; - -use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; - -// Node within the linked list queue of messages to send -struct Node<T> { - // FIXME: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -/// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an Arc if it is guaranteed that there -/// is only one popper and one pusher touching the queue at any one point in -/// time. -pub struct Queue<T> { - // consumer fields - tail: UnsafeCell<*mut Node<T>>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: UnsafeCell<*mut Node<T>>, // where to push to - first: UnsafeCell<*mut Node<T>>, // where to get new nodes from - tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, -} - -unsafe impl<T: Send> Send for Queue<T> { } - -unsafe impl<T: Send> Sync for Queue<T> { } - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - mem::transmute(box Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue. - /// - /// This is unsafe as the type system doesn't enforce a single - /// consumer-producer relationship. It also allows the consumer to `pop` - /// items while there is a `peek` active due to all methods having a - /// non-mutable receiver. - /// - /// # Arguments - /// - /// * `bound` - This queue implementation is implemented with a linked - /// list, and this means that a push is always a malloc. In - /// order to amortize this cost, an internal cache of nodes is - /// maintained to prevent a malloc from always being - /// necessary. This bound is the limit on the size of the - /// cache (if desired). If the value is 0, then the cache has - /// no bound. Otherwise, the cache will never grow larger than - /// `bound` (although the queue itself could be much larger. - pub unsafe fn new(bound: uint) -> Queue<T> { - let n1 = Node::new(); - let n2 = Node::new(); - (*n1).next.store(n2, Relaxed); - Queue { - tail: UnsafeCell::new(n2), - tail_prev: AtomicPtr::new(n1), - head: UnsafeCell::new(n2), - first: UnsafeCell::new(n1), - tail_copy: UnsafeCell::new(n1), - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - } - } - - /// Pushes a new value onto this queue. Note that to use this function - /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&self, t: T) { - unsafe { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (**self.head.get()).next.store(n, Release); - *self.head.get() = n; - } - } - - unsafe fn alloc(&self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - *self.tail_copy.get() = self.tail_prev.load(Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - /// Attempts to pop a value from this queue. Remember that to use this type - /// safely you must ensure that there is only one popper at a time. - pub fn pop(&self) -> Option<T> { - unsafe { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: Box<Node<T>> = mem::transmute(tail); - } - } - return ret; - } - } - - /// Attempts to peek at the head of the queue, returning `None` if the queue - /// has no data currently - /// - /// # Warning - /// The reference returned is invalid if it is not used before the consumer - /// pops the value off the queue. If the producer then pushes another value - /// onto the queue, it will overwrite the value pointed to by the reference. - pub fn peek<'a>(&'a self) -> Option<&'a mut T> { - // This is essentially the same as above with all the popping bits - // stripped out. - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - return (*next).value.as_mut(); - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.first.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::v1::*; - - use sync::Arc; - use super::Queue; - use thread::Thread; - use comm::channel; - - #[test] - fn smoke() { - unsafe { - let queue = Queue::new(0); - queue.push(1i); - queue.push(2); - assert_eq!(queue.pop(), Some(1i)); - assert_eq!(queue.pop(), Some(2)); - assert_eq!(queue.pop(), None); - queue.push(3); - queue.push(4); - assert_eq!(queue.pop(), Some(3)); - assert_eq!(queue.pop(), Some(4)); - assert_eq!(queue.pop(), None); - } - } - - #[test] - fn peek() { - unsafe { - let queue = Queue::new(0); - queue.push(vec![1i]); - - // Ensure the borrowchecker works - match queue.peek() { - Some(vec) => match vec.as_slice() { - // Note that `pop` is not allowed here due to borrow - [1] => {} - _ => return - }, - None => unreachable!() - } - - queue.pop(); - } - } - - #[test] - fn drop_full() { - unsafe { - let q = Queue::new(0); - q.push(box 1i); - q.push(box 2i); - } - } - - #[test] - fn smoke_bound() { - unsafe { - let q = Queue::new(0); - q.push(1i); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - } - - #[test] - fn stress() { - unsafe { - stress_bound(0); - stress_bound(1); - } - - unsafe fn stress_bound(bound: uint) { - let q = Arc::new(Queue::new(bound)); - - let (tx, rx) = channel(); - let q2 = q.clone(); - let _t = Thread::spawn(move|| { - for _ in range(0u, 100000) { - loop { - match q2.pop() { - Some(1i) => break, - Some(_) => panic!(), - None => {} - } - } - } - tx.send(()); - }); - for _ in range(0i, 100000) { - q.push(1); - } - rx.recv(); - } - } -} diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs deleted file mode 100644 index b68f626060e..00000000000 --- a/src/libstd/comm/stream.rs +++ /dev/null @@ -1,484 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Stream channels -/// -/// This is the flavor of channels which are optimized for one sender and one -/// receiver. The sender will be upgraded to a shared channel if the channel is -/// cloned. -/// -/// High level implementation details can be found in the comment of the parent -/// module. - -pub use self::Failure::*; -pub use self::UpgradeResult::*; -pub use self::SelectionResult::*; -use self::Message::*; - -use core::prelude::*; - -use core::cmp; -use core::int; -use thread::Thread; - -use sync::atomic; -use comm::spsc_queue as spsc; -use comm::Receiver; -use comm::blocking::{mod, SignalToken}; - -const DISCONNECTED: int = int::MIN; -#[cfg(test)] -const MAX_STEALS: int = 5; -#[cfg(not(test))] -const MAX_STEALS: int = 1 << 20; - -pub struct Packet<T> { - queue: spsc::Queue<Message<T>>, // internal queue for all message - - cnt: atomic::AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: atomic::AtomicUint, // SignalToken for the blocked thread to wake up - - port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed. -} - -pub enum Failure<T> { - Empty, - Disconnected, - Upgraded(Receiver<T>), -} - -pub enum UpgradeResult { - UpSuccess, - UpDisconnected, - UpWoke(SignalToken), -} - -pub enum SelectionResult<T> { - SelSuccess, - SelCanceled, - SelUpgraded(SignalToken, Receiver<T>), -} - -// Any message could contain an "upgrade request" to a new shared port, so the -// internal queue it's a queue of T, but rather Message<T> -enum Message<T> { - Data(T), - GoUp(Receiver<T>), -} - -impl<T: Send> Packet<T> { - pub fn new() -> Packet<T> { - Packet { - queue: unsafe { spsc::Queue::new(128) }, - - cnt: atomic::AtomicInt::new(0), - steals: 0, - to_wake: atomic::AtomicUint::new(0), - - port_dropped: atomic::AtomicBool::new(false), - } - } - - pub fn send(&mut self, t: T) -> Result<(), T> { - // If the other port has deterministically gone away, then definitely - // must return the data back up the stack. Otherwise, the data is - // considered as being sent. - if self.port_dropped.load(atomic::SeqCst) { return Err(t) } - - match self.do_send(Data(t)) { - UpSuccess | UpDisconnected => {}, - UpWoke(token) => { token.signal(); } - } - Ok(()) - } - - pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { - // If the port has gone away, then there's no need to proceed any - // further. - if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected } - - self.do_send(GoUp(up)) - } - - fn do_send(&mut self, t: Message<T>) -> UpgradeResult { - self.queue.push(t); - match self.cnt.fetch_add(1, atomic::SeqCst) { - // As described in the mod's doc comment, -1 == wakeup - -1 => UpWoke(self.take_to_wake()), - // As as described before, SPSC queues must be >= -2 - -2 => UpSuccess, - - // Be sure to preserve the disconnected state, and the return value - // in this case is going to be whether our data was received or not. - // This manifests itself on whether we have an empty queue or not. - // - // Primarily, are required to drain the queue here because the port - // will never remove this data. We can only have at most one item to - // drain (the port drains the rest). - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); - let first = self.queue.pop(); - let second = self.queue.pop(); - assert!(second.is_none()); - - match first { - Some(..) => UpSuccess, // we failed to send the data - None => UpDisconnected, // we successfully sent data - } - } - - // Otherwise we just sent some data on a non-waiting queue, so just - // make sure the world is sane and carry on! - n => { assert!(n >= 0); UpSuccess } - } - } - - // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> SignalToken { - let ptr = self.to_wake.load(atomic::SeqCst); - self.to_wake.store(0, atomic::SeqCst); - assert!(ptr != 0); - unsafe { SignalToken::cast_from_uint(ptr) } - } - - // Decrements the count on the channel for a sleeper, returning the sleeper - // back if it shouldn't sleep. Note that this is the location where we take - // steals into account. - fn decrement(&mut self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - let ptr = unsafe { token.cast_to_uint() }; - self.to_wake.store(ptr, atomic::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Ok(()) } - } - } - - self.to_wake.store(0, atomic::SeqCst); - Err(unsafe { SignalToken::cast_from_uint(ptr) }) - } - - pub fn recv(&mut self) -> Result<T, Failure<T>> { - // Optimistic preflight check (scheduling is expensive). - match self.try_recv() { - Err(Empty) => {} - data => return data, - } - - // Welp, our channel has no data. Deschedule the current task and - // initiate the blocking protocol. - let (wait_token, signal_token) = blocking::tokens(); - if self.decrement(signal_token).is_ok() { - wait_token.wait() - } - - match self.try_recv() { - // Messages which actually popped from the queue shouldn't count as - // a steal, so offset the decrement here (we already have our - // "steal" factored into the channel count above). - data @ Ok(..) | - data @ Err(Upgraded(..)) => { - self.steals -= 1; - data - } - - data => data, - } - } - - pub fn try_recv(&mut self) -> Result<T, Failure<T>> { - match self.queue.pop() { - // If we stole some data, record to that effect (this will be - // factored into cnt later on). - // - // Note that we don't allow steals to grow without bound in order to - // prevent eventual overflow of either steals or cnt as an overflow - // would have catastrophic results. Sometimes, steals > cnt, but - // other times cnt > steals, so we don't know the relation between - // steals and cnt. This code path is executed only rarely, so we do - // a pretty slow operation, of swapping 0 into cnt, taking steals - // down as much as possible (without going negative), and then - // adding back in whatever we couldn't factor into steals. - Some(data) => { - if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomic::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); - } - n => { - let m = cmp::min(n, self.steals); - self.steals -= m; - self.bump(n - m); - } - } - assert!(self.steals >= 0); - } - self.steals += 1; - match data { - Data(t) => Ok(t), - GoUp(up) => Err(Upgraded(up)), - } - } - - None => { - match self.cnt.load(atomic::SeqCst) { - n if n != DISCONNECTED => Err(Empty), - - // This is a little bit of a tricky case. We failed to pop - // data above, and then we have viewed that the channel is - // disconnected. In this window more data could have been - // sent on the channel. It doesn't really make sense to - // return that the channel is disconnected when there's - // actually data on it, so be extra sure there's no data by - // popping one more time. - // - // We can ignore steals because the other end is - // disconnected and we'll never need to really factor in our - // steals again. - _ => { - match self.queue.pop() { - Some(Data(t)) => Ok(t), - Some(GoUp(up)) => Err(Upgraded(up)), - None => Err(Disconnected), - } - } - } - } - } - } - - pub fn drop_chan(&mut self) { - // Dropping a channel is pretty simple, we just flag it as disconnected - // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { - -1 => { self.take_to_wake().signal(); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - - pub fn drop_port(&mut self) { - // Dropping a port seems like a fairly trivial thing. In theory all we - // need to do is flag that we're disconnected and then everything else - // can take over (we don't have anyone to wake up). - // - // The catch for Ports is that we want to drop the entire contents of - // the queue. There are multiple reasons for having this property, the - // largest of which is that if another chan is waiting in this channel - // (but not received yet), then waiting on that port will cause a - // deadlock. - // - // So if we accept that we must now destroy the entire contents of the - // queue, this code may make a bit more sense. The tricky part is that - // we can't let any in-flight sends go un-dropped, we have to make sure - // *everything* is dropped and nothing new will come onto the channel. - - // The first thing we do is set a flag saying that we're done for. All - // sends are gated on this flag, so we're immediately guaranteed that - // there are a bounded number of active sends that we'll have to deal - // with. - self.port_dropped.store(true, atomic::SeqCst); - - // Now that we're guaranteed to deal with a bounded number of senders, - // we need to drain the queue. This draining process happens atomically - // with respect to the "count" of the channel. If the count is nonzero - // (with steals taken into account), then there must be data on the - // channel. In this case we drain everything and then try again. We will - // continue to fail while active senders send data while we're dropping - // data, but eventually we're guaranteed to break out of this loop - // (because there is a bounded number of senders). - let mut steals = self.steals; - while { - let cnt = self.cnt.compare_and_swap( - steals, DISCONNECTED, atomic::SeqCst); - cnt != DISCONNECTED && cnt != steals - } { - loop { - match self.queue.pop() { - Some(..) => { steals += 1; } - None => break - } - } - } - - // At this point in time, we have gated all future senders from sending, - // and we have flagged the channel as being disconnected. The senders - // still have some responsibility, however, because some sends may not - // complete until after we flag the disconnection. There are more - // details in the sending methods that see DISCONNECTED - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // Tests to see whether this port can receive without blocking. If Ok is - // returned, then that's the answer. If Err is returned, then the returned - // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { - // We peek at the queue to see if there's anything on it, and we use - // this return value to determine if we should pop from the queue and - // upgrade this channel immediately. If it looks like we've got an - // upgrade pending, then go through the whole recv rigamarole to update - // the internal state. - match self.queue.peek() { - Some(&GoUp(..)) => { - match self.recv() { - Err(Upgraded(port)) => Err(port), - _ => unreachable!(), - } - } - Some(..) => Ok(true), - None => Ok(false) - } - } - - // increment the count on the channel (used for selection) - fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomic::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomic::SeqCst); - DISCONNECTED - } - n => n - } - } - - // Attempts to start selecting on this port. Like a oneshot, this can fail - // immediately because of an upgrade. - pub fn start_selection(&mut self, token: SignalToken) -> SelectionResult<T> { - match self.decrement(token) { - Ok(()) => SelSuccess, - Err(token) => { - let ret = match self.queue.peek() { - Some(&GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => SelUpgraded(token, port), - _ => unreachable!(), - } - } - Some(..) => SelCanceled, - None => SelCanceled, - }; - // Undo our decrement above, and we should be guaranteed that the - // previous value is positive because we're not going to sleep - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - return ret; - } - } - } - - // Removes a previous task from being blocked in this port - pub fn abort_selection(&mut self, - was_upgrade: bool) -> Result<bool, Receiver<T>> { - // If we're aborting selection after upgrading from a oneshot, then - // we're guarantee that no one is waiting. The only way that we could - // have seen the upgrade is if data was actually sent on the channel - // half again. For us, this means that there is guaranteed to be data on - // this channel. Furthermore, we're guaranteed that there was no - // start_selection previously, so there's no need to modify `self.cnt` - // at all. - // - // Hence, because of these invariants, we immediately return `Ok(true)`. - // Note that the data may not actually be sent on the channel just yet. - // The other end could have flagged the upgrade but not sent data to - // this end. This is fine because we know it's a small bounded windows - // of time until the data is actually sent. - if was_upgrade { - assert_eq!(self.steals, 0); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - return Ok(true) - } - - // We want to make sure that the count on the channel goes non-negative, - // and in the stream case we can have at most one steal, so just assume - // that we had one steal. - let steals = 1; - let prev = self.bump(steals + 1); - - // If we were previously disconnected, then we know for sure that there - // is no task in to_wake, so just keep going - let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - true // there is data, that data is that we're disconnected - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - - // If the previous count was negative, then we just made things go - // positive, hence we passed the -1 boundary and we're responsible - // for removing the to_wake() field and trashing it. - // - // If the previous count was positive then we're in a tougher - // situation. A possible race is that a sender just incremented - // through -1 (meaning it's going to try to wake a task up), but it - // hasn't yet read the to_wake. In order to prevent a future recv() - // from waking up too early (this sender picking up the plastered - // over to_wake), we spin loop here waiting for to_wake to be 0. - // Note that this entire select() implementation needs an overhaul, - // and this is *not* the worst part of it, so this is not done as a - // final solution but rather out of necessity for now to get - // something working. - if prev < 0 { - drop(self.take_to_wake()); - } else { - while self.to_wake.load(atomic::SeqCst) != 0 { - Thread::yield_now(); - } - } - assert_eq!(self.steals, 0); - self.steals = steals; - - // if we were previously positive, then there's surely data to - // receive - prev >= 0 - }; - - // Now that we've determined that this queue "has data", we peek at the - // queue to see if the data is an upgrade or not. If it's an upgrade, - // then we need to destroy this port and abort selection on the - // upgraded port. - if has_data { - match self.queue.peek() { - Some(&GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => Err(port), - _ => unreachable!(), - } - } - _ => Ok(true), - } - } else { - Ok(false) - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomic::SeqCst), 0); - } -} diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs deleted file mode 100644 index 88338849965..00000000000 --- a/src/libstd/comm/sync.rs +++ /dev/null @@ -1,483 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Synchronous channels/ports -/// -/// This channel implementation differs significantly from the asynchronous -/// implementations found next to it (oneshot/stream/share). This is an -/// implementation of a synchronous, bounded buffer channel. -/// -/// Each channel is created with some amount of backing buffer, and sends will -/// *block* until buffer space becomes available. A buffer size of 0 is valid, -/// which means that every successful send is paired with a successful recv. -/// -/// This flavor of channels defines a new `send_opt` method for channels which -/// is the method by which a message is sent but the task does not panic if it -/// cannot be delivered. -/// -/// Another major difference is that send() will *always* return back the data -/// if it couldn't be sent. This is because it is deterministically known when -/// the data is received and when it is not received. -/// -/// Implementation-wise, it can all be summed up with "use a mutex plus some -/// logic". The mutex used here is an OS native mutex, meaning that no user code -/// is run inside of the mutex (to prevent context switching). This -/// implementation shares almost all code for the buffered and unbuffered cases -/// of a synchronous channel. There are a few branches for the unbuffered case, -/// but they're mostly just relevant to blocking senders. - -use core::prelude::*; - -pub use self::Failure::*; -use self::Blocker::*; - -use vec::Vec; -use core::mem; - -use sync::{atomic, Mutex, MutexGuard}; -use comm::blocking::{mod, WaitToken, SignalToken}; -use comm::select::StartResult::{mod, Installed, Abort}; - -pub struct Packet<T> { - /// Only field outside of the mutex. Just done for kicks, but mainly because - /// the other shared channel already had the code implemented - channels: atomic::AtomicUint, - - lock: Mutex<State<T>>, -} - -unsafe impl<T:Send> Send for Packet<T> { } - -unsafe impl<T:Send> Sync for Packet<T> { } - -struct State<T> { - disconnected: bool, // Is the channel disconnected yet? - queue: Queue, // queue of senders waiting to send data - blocker: Blocker, // currently blocked task on this channel - buf: Buffer<T>, // storage for buffered messages - cap: uint, // capacity of this channel - - /// A curious flag used to indicate whether a sender failed or succeeded in - /// blocking. This is used to transmit information back to the task that it - /// must dequeue its message from the buffer because it was not received. - /// This is only relevant in the 0-buffer case. This obviously cannot be - /// safely constructed, but it's guaranteed to always have a valid pointer - /// value. - canceled: Option<&'static mut bool>, -} - -unsafe impl<T: Send> Send for State<T> {} - -/// Possible flavors of threads who can be blocked on this channel. -enum Blocker { - BlockedSender(SignalToken), - BlockedReceiver(SignalToken), - NoneBlocked -} - -/// Simple queue for threading tasks together. Nodes are stack-allocated, so -/// this structure is not safe at all -struct Queue { - head: *mut Node, - tail: *mut Node, -} - -struct Node { - token: Option<SignalToken>, - next: *mut Node, -} - -unsafe impl Send for Node {} - -/// A simple ring-buffer -struct Buffer<T> { - buf: Vec<Option<T>>, - start: uint, - size: uint, -} - -#[deriving(Show)] -pub enum Failure { - Empty, - Disconnected, -} - -/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock` -/// in the meantime. This re-locks the mutex upon returning. -fn wait<'a, 'b, T: Send>(lock: &'a Mutex<State<T>>, - mut guard: MutexGuard<'b, State<T>>, - f: fn(SignalToken) -> Blocker) - -> MutexGuard<'a, State<T>> -{ - let (wait_token, signal_token) = blocking::tokens(); - match mem::replace(&mut guard.blocker, f(signal_token)) { - NoneBlocked => {} - _ => unreachable!(), - } - drop(guard); // unlock - wait_token.wait(); // block - lock.lock() // relock -} - -/// Wakes up a thread, dropping the lock at the correct time -fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) { - // We need to be careful to wake up the waiting task *outside* of the mutex - // in case it incurs a context switch. - drop(guard); - token.signal(); -} - -impl<T: Send> Packet<T> { - pub fn new(cap: uint) -> Packet<T> { - Packet { - channels: atomic::AtomicUint::new(1), - lock: Mutex::new(State { - disconnected: false, - blocker: NoneBlocked, - cap: cap, - canceled: None, - queue: Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, - }, - buf: Buffer { - buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None), - start: 0, - size: 0, - }, - }), - } - } - - // wait until a send slot is available, returning locked access to - // the channel state. - fn acquire_send_slot(&self) -> MutexGuard<State<T>> { - let mut node = Node { token: None, next: 0 as *mut Node }; - loop { - let mut guard = self.lock.lock(); - // are we ready to go? - if guard.disconnected || guard.buf.size() < guard.buf.cap() { - return guard; - } - // no room; actually block - let wait_token = guard.queue.enqueue(&mut node); - drop(guard); - wait_token.wait(); - } - } - - pub fn send(&self, t: T) -> Result<(), T> { - let mut guard = self.acquire_send_slot(); - if guard.disconnected { return Err(t) } - guard.buf.enqueue(t); - - match mem::replace(&mut guard.blocker, NoneBlocked) { - // if our capacity is 0, then we need to wait for a receiver to be - // available to take our data. After waiting, we check again to make - // sure the port didn't go away in the meantime. If it did, we need - // to hand back our data. - NoneBlocked if guard.cap == 0 => { - let mut canceled = false; - assert!(guard.canceled.is_none()); - guard.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - let mut guard = wait(&self.lock, guard, BlockedSender); - if canceled {Err(guard.buf.dequeue())} else {Ok(())} - } - - // success, we buffered some data - NoneBlocked => Ok(()), - - // success, someone's about to receive our buffered data. - BlockedReceiver(token) => { wakeup(token, guard); Ok(()) } - - BlockedSender(..) => panic!("lolwut"), - } - } - - pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { - let mut guard = self.lock.lock(); - if guard.disconnected { - Err(super::RecvDisconnected(t)) - } else if guard.buf.size() == guard.buf.cap() { - Err(super::Full(t)) - } else if guard.cap == 0 { - // With capacity 0, even though we have buffer space we can't - // transfer the data unless there's a receiver waiting. - match mem::replace(&mut guard.blocker, NoneBlocked) { - NoneBlocked => Err(super::Full(t)), - BlockedSender(..) => unreachable!(), - BlockedReceiver(token) => { - guard.buf.enqueue(t); - wakeup(token, guard); - Ok(()) - } - } - } else { - // If the buffer has some space and the capacity isn't 0, then we - // just enqueue the data for later retrieval, ensuring to wake up - // any blocked receiver if there is one. - assert!(guard.buf.size() < guard.buf.cap()); - guard.buf.enqueue(t); - match mem::replace(&mut guard.blocker, NoneBlocked) { - BlockedReceiver(token) => wakeup(token, guard), - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - } - Ok(()) - } - } - - // Receives a message from this channel - // - // When reading this, remember that there can only ever be one receiver at - // time. - pub fn recv(&self) -> Result<T, ()> { - let mut guard = self.lock.lock(); - - // Wait for the buffer to have something in it. No need for a while loop - // because we're the only receiver. - let mut waited = false; - if !guard.disconnected && guard.buf.size() == 0 { - guard = wait(&self.lock, guard, BlockedReceiver); - waited = true; - } - if guard.disconnected && guard.buf.size() == 0 { return Err(()) } - - // Pick up the data, wake up our neighbors, and carry on - assert!(guard.buf.size() > 0); - let ret = guard.buf.dequeue(); - self.wakeup_senders(waited, guard); - return Ok(ret); - } - - pub fn try_recv(&self) -> Result<T, Failure> { - let mut guard = self.lock.lock(); - - // Easy cases first - if guard.disconnected { return Err(Disconnected) } - if guard.buf.size() == 0 { return Err(Empty) } - - // Be sure to wake up neighbors - let ret = Ok(guard.buf.dequeue()); - self.wakeup_senders(false, guard); - - return ret; - } - - // Wake up pending senders after some data has been received - // - // * `waited` - flag if the receiver blocked to receive some data, or if it - // just picked up some data on the way out - // * `guard` - the lock guard that is held over this channel's lock - fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) { - let pending_sender1: Option<SignalToken> = guard.queue.dequeue(); - - // If this is a no-buffer channel (cap == 0), then if we didn't wait we - // need to ACK the sender. If we waited, then the sender waking us up - // was already the ACK. - let pending_sender2 = if guard.cap == 0 && !waited { - match mem::replace(&mut guard.blocker, NoneBlocked) { - NoneBlocked => None, - BlockedReceiver(..) => unreachable!(), - BlockedSender(token) => { - guard.canceled.take(); - Some(token) - } - } - } else { - None - }; - mem::drop(guard); - - // only outside of the lock do we wake up the pending tasks - pending_sender1.map(|t| t.signal()); - pending_sender2.map(|t| t.signal()); - } - - // Prepares this shared packet for a channel clone, essentially just bumping - // a refcount. - pub fn clone_chan(&self) { - self.channels.fetch_add(1, atomic::SeqCst); - } - - pub fn drop_chan(&self) { - // Only flag the channel as disconnected if we're the last channel - match self.channels.fetch_sub(1, atomic::SeqCst) { - 1 => {} - _ => return - } - - // Not much to do other than wake up a receiver if one's there - let mut guard = self.lock.lock(); - if guard.disconnected { return } - guard.disconnected = true; - match mem::replace(&mut guard.blocker, NoneBlocked) { - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - BlockedReceiver(token) => wakeup(token, guard), - } - } - - pub fn drop_port(&self) { - let mut guard = self.lock.lock(); - - if guard.disconnected { return } - guard.disconnected = true; - - // If the capacity is 0, then the sender may want its data back after - // we're disconnected. Otherwise it's now our responsibility to destroy - // the buffered data. As with many other portions of this code, this - // needs to be careful to destroy the data *outside* of the lock to - // prevent deadlock. - let _data = if guard.cap != 0 { - mem::replace(&mut guard.buf.buf, Vec::new()) - } else { - Vec::new() - }; - let mut queue = mem::replace(&mut guard.queue, Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, - }); - - let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) { - NoneBlocked => None, - BlockedSender(token) => { - *guard.canceled.take().unwrap() = true; - Some(token) - } - BlockedReceiver(..) => unreachable!(), - }; - mem::drop(guard); - - loop { - match queue.dequeue() { - Some(token) => { token.signal(); } - None => break, - } - } - waiter.map(|t| t.signal()); - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&self) -> bool { - let guard = self.lock.lock(); - guard.disconnected || guard.buf.size() > 0 - } - - // Attempts to start selection on this port. This can either succeed or fail - // because there is data waiting. - pub fn start_selection(&self, token: SignalToken) -> StartResult { - let mut guard = self.lock.lock(); - if guard.disconnected || guard.buf.size() > 0 { - Abort - } else { - match mem::replace(&mut guard.blocker, BlockedReceiver(token)) { - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - BlockedReceiver(..) => unreachable!(), - } - Installed - } - } - - // Remove a previous selecting task from this port. This ensures that the - // blocked task will no longer be visible to any other threads. - // - // The return value indicates whether there's data on this port. - pub fn abort_selection(&self) -> bool { - let mut guard = self.lock.lock(); - match mem::replace(&mut guard.blocker, NoneBlocked) { - NoneBlocked => true, - BlockedSender(token) => { - guard.blocker = BlockedSender(token); - true - } - BlockedReceiver(token) => { drop(token); false } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - assert_eq!(self.channels.load(atomic::SeqCst), 0); - let mut guard = self.lock.lock(); - assert!(guard.queue.dequeue().is_none()); - assert!(guard.canceled.is_none()); - } -} - - -//////////////////////////////////////////////////////////////////////////////// -// Buffer, a simple ring buffer backed by Vec<T> -//////////////////////////////////////////////////////////////////////////////// - -impl<T> Buffer<T> { - fn enqueue(&mut self, t: T) { - let pos = (self.start + self.size) % self.buf.len(); - self.size += 1; - let prev = mem::replace(&mut self.buf[pos], Some(t)); - assert!(prev.is_none()); - } - - fn dequeue(&mut self) -> T { - let start = self.start; - self.size -= 1; - self.start = (self.start + 1) % self.buf.len(); - self.buf[start].take().unwrap() - } - - fn size(&self) -> uint { self.size } - fn cap(&self) -> uint { self.buf.len() } -} - -//////////////////////////////////////////////////////////////////////////////// -// Queue, a simple queue to enqueue tasks with (stack-allocated nodes) -//////////////////////////////////////////////////////////////////////////////// - -impl Queue { - fn enqueue(&mut self, node: &mut Node) -> WaitToken { - let (wait_token, signal_token) = blocking::tokens(); - node.token = Some(signal_token); - node.next = 0 as *mut Node; - - if self.tail.is_null() { - self.head = node as *mut Node; - self.tail = node as *mut Node; - } else { - unsafe { - (*self.tail).next = node as *mut Node; - self.tail = node as *mut Node; - } - } - - wait_token - } - - fn dequeue(&mut self) -> Option<SignalToken> { - if self.head.is_null() { - return None - } - let node = self.head; - self.head = unsafe { (*node).next }; - if self.head.is_null() { - self.tail = 0 as *mut Node; - } - unsafe { - (*node).next = 0 as *mut Node; - Some((*node).token.take().unwrap()) - } - } -} |
