diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-11-23 12:52:37 -0800 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-24 10:51:39 -0800 |
| commit | 985acfdb67d550d0259fcdcfbeed0a86ec3da9d0 (patch) | |
| tree | 0c5c9056f11c6f3f602310e1592345e931676c18 /src/libstd/comm | |
| parent | 54c628cb849ad53b66f0d738dc8c83529a9d08d2 (diff) | |
| download | rust-985acfdb67d550d0259fcdcfbeed0a86ec3da9d0.tar.gz rust-985acfdb67d550d0259fcdcfbeed0a86ec3da9d0.zip | |
Merge libsync into libstd
This patch merges the `libsync` crate into `libstd`, undoing part of the facade. This is in preparation for ultimately merging `librustrt`, as well as the upcoming rewrite of `sync`. Because this removes the `libsync` crate, it is a: [breaking-change] However, all uses of `libsync` should be able to reroute through `std::sync` and `std::comm` instead.
Diffstat (limited to 'src/libstd/comm')
| -rw-r--r-- | src/libstd/comm/mod.rs | 2085 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 377 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 711 | ||||
| -rw-r--r-- | src/libstd/comm/shared.rs | 493 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 486 | ||||
| -rw-r--r-- | src/libstd/comm/sync.rs | 490 |
6 files changed, 4642 insertions, 0 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs new file mode 100644 index 00000000000..2b66e91c00d --- /dev/null +++ b/src/libstd/comm/mod.rs @@ -0,0 +1,2085 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Communication primitives for concurrent tasks +//! +//! Rust makes it very difficult to share data among tasks to prevent race +//! conditions and to improve parallelism, but there is often a need for +//! communication between concurrent tasks. The primitives defined in this +//! module are the building blocks for synchronization in rust. +//! +//! This module provides message-based communication over channels, concretely +//! defined among three types: +//! +//! * `Sender` +//! * `SyncSender` +//! * `Receiver` +//! +//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both +//! senders are clone-able such that many tasks can send simultaneously to one +//! receiver. These channels are *task blocking*, not *thread blocking*. This +//! means that if one task is blocked on a channel, other tasks can continue to +//! make progress. +//! +//! Rust channels come in one of two flavors: +//! +//! 1. An asynchronous, infinitely buffered channel. The `channel()` function +//! will return a `(Sender, Receiver)` tuple where all sends will be +//! **asynchronous** (they never block). The channel conceptually has an +//! infinite buffer. +//! +//! 2. A synchronous, bounded channel. The `sync_channel()` function will return +//! a `(SyncSender, Receiver)` tuple where the storage for pending messages +//! is a pre-allocated buffer of a fixed size. All sends will be +//! **synchronous** by blocking until there is buffer space available. Note +//! that a bound of 0 is allowed, causing the channel to become a +//! "rendezvous" channel where each sender atomically hands off a message to +//! a receiver. +//! +//! ## Panic Propagation +//! +//! In addition to being a core primitive for communicating in rust, channels +//! are the points at which panics are propagated among tasks. Whenever the one +//! half of channel is closed, the other half will have its next operation +//! `panic!`. The purpose of this is to allow propagation of panics among tasks +//! that are linked to one another via channels. +//! +//! There are methods on both of senders and receivers to perform their +//! respective operations without panicking, however. +//! +//! ## Runtime Requirements +//! +//! The channel types defined in this module generally have very few runtime +//! requirements in order to operate. The major requirement they have is for a +//! local rust `Task` to be available if any *blocking* operation is performed. +//! +//! If a local `Task` is not available (for example an FFI callback), then the +//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as +//! the `try_send` method on a `SyncSender`, but no other operations are +//! guaranteed to be safe. +//! +//! # Example +//! +//! Simple usage: +//! +//! ``` +//! // Create a simple streaming channel +//! let (tx, rx) = channel(); +//! spawn(proc() { +//! tx.send(10i); +//! }); +//! assert_eq!(rx.recv(), 10i); +//! ``` +//! +//! Shared usage: +//! +//! ``` +//! // Create a shared channel which can be sent along from many tasks +//! // where tx is the sending half (tx for transmission), and rx is the receiving +//! // half (rx for receiving). +//! let (tx, rx) = channel(); +//! for i in range(0i, 10i) { +//! let tx = tx.clone(); +//! spawn(proc() { +//! tx.send(i); +//! }) +//! } +//! +//! for _ in range(0i, 10i) { +//! let j = rx.recv(); +//! assert!(0 <= j && j < 10); +//! } +//! ``` +//! +//! Propagating panics: +//! +//! ```should_fail +//! // The call to recv() will panic!() because the channel has already hung +//! // up (or been deallocated) +//! let (tx, rx) = channel::<int>(); +//! drop(tx); +//! rx.recv(); +//! ``` +//! +//! Synchronous channels: +//! +//! ``` +//! let (tx, rx) = sync_channel::<int>(0); +//! spawn(proc() { +//! // This will wait for the parent task to start receiving +//! tx.send(53); +//! }); +//! rx.recv(); +//! ``` +//! +//! Reading from a channel with a timeout requires to use a Timer together +//! with the channel. You can use the select! macro to select either and +//! handle the timeout case. This first example will break out of the loop +//! after 10 seconds no matter what: +//! +//! ```no_run +//! use std::io::timer::Timer; +//! use std::time::Duration; +//! +//! let (tx, rx) = channel::<int>(); +//! let mut timer = Timer::new().unwrap(); +//! let timeout = timer.oneshot(Duration::seconds(10)); +//! +//! loop { +//! select! { +//! val = rx.recv() => println!("Received {}", val), +//! () = timeout.recv() => { +//! println!("timed out, total time was more than 10 seconds") +//! break; +//! } +//! } +//! } +//! ``` +//! +//! This second example is more costly since it allocates a new timer every +//! time a message is received, but it allows you to timeout after the channel +//! has been inactive for 5 seconds: +//! +//! ```no_run +//! use std::io::timer::Timer; +//! use std::time::Duration; +//! +//! let (tx, rx) = channel::<int>(); +//! let mut timer = Timer::new().unwrap(); +//! +//! loop { +//! let timeout = timer.oneshot(Duration::seconds(5)); +//! +//! select! { +//! val = rx.recv() => println!("Received {}", val), +//! () = timeout.recv() => { +//! println!("timed out, no message received in 5 seconds") +//! break; +//! } +//! } +//! } +//! ``` + +// A description of how Rust's channel implementation works +// +// Channels are supposed to be the basic building block for all other +// concurrent primitives that are used in Rust. As a result, the channel type +// needs to be highly optimized, flexible, and broad enough for use everywhere. +// +// The choice of implementation of all channels is to be built on lock-free data +// structures. The channels themselves are then consequently also lock-free data +// structures. As always with lock-free code, this is a very "here be dragons" +// territory, especially because I'm unaware of any academic papers which have +// gone into great length about channels of these flavors. +// +// ## Flavors of channels +// +// From the perspective of a consumer of this library, there is only one flavor +// of channel. This channel can be used as a stream and cloned to allow multiple +// senders. Under the hood, however, there are actually three flavors of +// channels in play. +// +// * Oneshots - these channels are highly optimized for the one-send use case. +// They contain as few atomics as possible and involve one and +// exactly one allocation. +// * Streams - these channels are optimized for the non-shared use case. They +// use a different concurrent queue which is more tailored for this +// use case. The initial allocation of this flavor of channel is not +// optimized. +// * Shared - this is the most general form of channel that this module offers, +// a channel with multiple senders. This type is as optimized as it +// can be, but the previous two types mentioned are much faster for +// their use-cases. +// +// ## Concurrent queues +// +// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but +// recv() obviously blocks. This means that under the hood there must be some +// shared and concurrent queue holding all of the actual data. +// +// With two flavors of channels, two flavors of queues are also used. We have +// chosen to use queues from a well-known author which are abbreviated as SPSC +// and MPSC (single producer, single consumer and multiple producer, single +// consumer). SPSC queues are used for streams while MPSC queues are used for +// shared channels. +// +// ### SPSC optimizations +// +// The SPSC queue found online is essentially a linked list of nodes where one +// half of the nodes are the "queue of data" and the other half of nodes are a +// cache of unused nodes. The unused nodes are used such that an allocation is +// not required on every push() and a free doesn't need to happen on every +// pop(). +// +// As found online, however, the cache of nodes is of an infinite size. This +// means that if a channel at one point in its life had 50k items in the queue, +// then the queue will always have the capacity for 50k items. I believed that +// this was an unnecessary limitation of the implementation, so I have altered +// the queue to optionally have a bound on the cache size. +// +// By default, streams will have an unbounded SPSC queue with a small-ish cache +// size. The hope is that the cache is still large enough to have very fast +// send() operations while not too large such that millions of channels can +// coexist at once. +// +// ### MPSC optimizations +// +// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses +// a linked list under the hood to earn its unboundedness, but I have not put +// forth much effort into having a cache of nodes similar to the SPSC queue. +// +// For now, I believe that this is "ok" because shared channels are not the most +// common type, but soon we may wish to revisit this queue choice and determine +// another candidate for backend storage of shared channels. +// +// ## Overview of the Implementation +// +// Now that there's a little background on the concurrent queues used, it's +// worth going into much more detail about the channels themselves. The basic +// pseudocode for a send/recv are: +// +// +// send(t) recv() +// queue.push(t) return if queue.pop() +// if increment() == -1 deschedule { +// wakeup() if decrement() > 0 +// cancel_deschedule() +// } +// queue.pop() +// +// As mentioned before, there are no locks in this implementation, only atomic +// instructions are used. +// +// ### The internal atomic counter +// +// Every channel has a shared counter with each half to keep track of the size +// of the queue. This counter is used to abort descheduling by the receiver and +// to know when to wake up on the sending side. +// +// As seen in the pseudocode, senders will increment this count and receivers +// will decrement the count. The theory behind this is that if a sender sees a +// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, +// then it doesn't need to block. +// +// The recv() method has a beginning call to pop(), and if successful, it needs +// to decrement the count. It is a crucial implementation detail that this +// decrement does *not* happen to the shared counter. If this were the case, +// then it would be possible for the counter to be very negative when there were +// no receivers waiting, in which case the senders would have to determine when +// it was actually appropriate to wake up a receiver. +// +// Instead, the "steal count" is kept track of separately (not atomically +// because it's only used by receivers), and then the decrement() call when +// descheduling will lump in all of the recent steals into one large decrement. +// +// The implication of this is that if a sender sees a -1 count, then there's +// guaranteed to be a waiter waiting! +// +// ## Native Implementation +// +// A major goal of these channels is to work seamlessly on and off the runtime. +// All of the previous race conditions have been worded in terms of +// scheduler-isms (which is obviously not available without the runtime). +// +// For now, native usage of channels (off the runtime) will fall back onto +// mutexes/cond vars for descheduling/atomic decisions. The no-contention path +// is still entirely lock-free, the "deschedule" blocks above are surrounded by +// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a +// condition variable. +// +// ## Select +// +// Being able to support selection over channels has greatly influenced this +// design, and not only does selection need to work inside the runtime, but also +// outside the runtime. +// +// The implementation is fairly straightforward. The goal of select() is not to +// return some data, but only to return which channel can receive data without +// blocking. The implementation is essentially the entire blocking procedure +// followed by an increment as soon as its woken up. The cancellation procedure +// involves an increment and swapping out of to_wake to acquire ownership of the +// task to unblock. +// +// Sadly this current implementation requires multiple allocations, so I have +// seen the throughput of select() be much worse than it should be. I do not +// believe that there is anything fundamental which needs to change about these +// channels, however, in order to support a more efficient select(). +// +// # Conclusion +// +// And now that you've seen all the races that I found and attempted to fix, +// here's the code for you to find some more! + +use core::prelude::*; + +pub use self::TryRecvError::*; +pub use self::TrySendError::*; +use self::Flavor::*; + +use alloc::arc::Arc; +use core::kinds::marker; +use core::mem; +use core::cell::UnsafeCell; +use rustrt::task::BlockedTask; + +pub use comm::select::{Select, Handle}; + +macro_rules! test ( + { fn $name:ident() $b:block $(#[$a:meta])*} => ( + mod $name { + #![allow(unused_imports)] + + extern crate rustrt; + + use prelude::*; + + use comm::*; + use super::*; + use task; + + $(#[$a])* #[test] fn f() { $b } + } + ) +) + +mod oneshot; +mod select; +mod shared; +mod stream; +mod sync; + +/// The receiving-half of Rust's channel type. This half can only be owned by +/// one task +#[unstable] +pub struct Receiver<T> { + inner: UnsafeCell<Flavor<T>>, + // can't share in an arc + _marker: marker::NoSync, +} + +/// An iterator over messages on a receiver, this iterator will block +/// whenever `next` is called, waiting for a new message, and `None` will be +/// returned when the corresponding channel has hung up. +#[unstable] +pub struct Messages<'a, T:'a> { + rx: &'a Receiver<T> +} + +/// The sending-half of Rust's asynchronous channel type. This half can only be +/// owned by one task, but it can be cloned to send to other tasks. +#[unstable] +pub struct Sender<T> { + inner: UnsafeCell<Flavor<T>>, + // can't share in an arc + _marker: marker::NoSync, +} + +/// The sending-half of Rust's synchronous channel type. This half can only be +/// owned by one task, but it can be cloned to send to other tasks. +#[unstable = "this type may be renamed, but it will always exist"] +pub struct SyncSender<T> { + inner: Arc<UnsafeCell<sync::Packet<T>>>, + // can't share in an arc + _marker: marker::NoSync, +} + +/// This enumeration is the list of the possible reasons that try_recv could not +/// return data when called. +#[deriving(PartialEq, Clone, Show)] +#[experimental = "this is likely to be removed in changing try_recv()"] +pub enum TryRecvError { + /// This channel is currently empty, but the sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// This channel's sending half has become disconnected, and there will + /// never be any more data received on this channel + Disconnected, +} + +/// This enumeration is the list of the possible error outcomes for the +/// `SyncSender::try_send` method. +#[deriving(PartialEq, Clone, Show)] +#[experimental = "this is likely to be removed in changing try_send()"] +pub enum TrySendError<T> { + /// The data could not be sent on the channel because it would require that + /// the callee block to send the data. + /// + /// If this is a buffered channel, then the buffer is full at this time. If + /// this is not a buffered channel, then there is no receiver available to + /// acquire the data. + Full(T), + /// This channel's receiving half has disconnected, so the data could not be + /// sent. The data is returned back to the callee in this case. + RecvDisconnected(T), +} + +enum Flavor<T> { + Oneshot(Arc<UnsafeCell<oneshot::Packet<T>>>), + Stream(Arc<UnsafeCell<stream::Packet<T>>>), + Shared(Arc<UnsafeCell<shared::Packet<T>>>), + Sync(Arc<UnsafeCell<sync::Packet<T>>>), +} + +#[doc(hidden)] +trait UnsafeFlavor<T> { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>>; + unsafe fn inner_mut<'a>(&'a self) -> &'a mut Flavor<T> { + &mut *self.inner_unsafe().get() + } + unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> { + &*self.inner_unsafe().get() + } +} +impl<T> UnsafeFlavor<T> for Sender<T> { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> { + &self.inner + } +} +impl<T> UnsafeFlavor<T> for Receiver<T> { + fn inner_unsafe<'a>(&'a self) -> &'a UnsafeCell<Flavor<T>> { + &self.inner + } +} + +/// Creates a new asynchronous channel, returning the sender/receiver halves. +/// +/// All data sent on the sender will become available on the receiver, and no +/// send will block the calling task (this channel has an "infinite buffer"). +/// +/// # Example +/// +/// ``` +/// // tx is is the sending half (tx for transmission), and rx is the receiving +/// // half (rx for receiving). +/// let (tx, rx) = channel(); +/// +/// // Spawn off an expensive computation +/// spawn(proc() { +/// # fn expensive_computation() {} +/// tx.send(expensive_computation()); +/// }); +/// +/// // Do some useful work for awhile +/// +/// // Let's see what that answer was +/// println!("{}", rx.recv()); +/// ``` +#[unstable] +pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { + let a = Arc::new(UnsafeCell::new(oneshot::Packet::new())); + (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) +} + +/// Creates a new synchronous, bounded channel. +/// +/// Like asynchronous channels, the `Receiver` will block until a message +/// becomes available. These channels differ greatly in the semantics of the +/// sender from asynchronous channels, however. +/// +/// This channel has an internal buffer on which messages will be queued. When +/// the internal buffer becomes full, future sends will *block* waiting for the +/// buffer to open up. Note that a buffer size of 0 is valid, in which case this +/// becomes "rendezvous channel" where each send will not return until a recv +/// is paired with it. +/// +/// As with asynchronous channels, all senders will panic in `send` if the +/// `Receiver` has been destroyed. +/// +/// # Example +/// +/// ``` +/// let (tx, rx) = sync_channel(1); +/// +/// // this returns immediately +/// tx.send(1i); +/// +/// spawn(proc() { +/// // this will block until the previous message has been received +/// tx.send(2i); +/// }); +/// +/// assert_eq!(rx.recv(), 1i); +/// assert_eq!(rx.recv(), 2i); +/// ``` +#[unstable = "this function may be renamed to more accurately reflect the type \ + of channel that is is creating"] +pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { + let a = Arc::new(UnsafeCell::new(sync::Packet::new(bound))); + (SyncSender::new(a.clone()), Receiver::new(Sync(a))) +} + +//////////////////////////////////////////////////////////////////////////////// +// Sender +//////////////////////////////////////////////////////////////////////////////// + +impl<T: Send> Sender<T> { + fn new(inner: Flavor<T>) -> Sender<T> { + Sender { + inner: UnsafeCell::new(inner), + _marker: marker::NoSync, + } + } + + /// Sends a value along this channel to be received by the corresponding + /// receiver. + /// + /// Rust channels are infinitely buffered so this method will never block. + /// + /// # Panics + /// + /// This function will panic if the other end of the channel has hung up. + /// This means that if the corresponding receiver has fallen out of scope, + /// this function will trigger a panic message saying that a message is + /// being sent on a closed channel. + /// + /// Note that if this function does *not* panic, it does not mean that the + /// data will be successfully received. All sends are placed into a queue, + /// so it is possible for a send to succeed (the other end is alive), but + /// then the other end could immediately disconnect. + /// + /// The purpose of this functionality is to propagate panicks among tasks. + /// If a panic is not desired, then consider using the `send_opt` method + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn send(&self, t: T) { + if self.send_opt(t).is_err() { + panic!("sending on a closed channel"); + } + } + + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of `Err` means that the data will never be + /// received, but a return value of `Ok` does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns `Ok`. + /// + /// Like `send`, this method will never block. + /// + /// # Panics + /// + /// This method will never panic, it will return the message back to the + /// caller if the other end is disconnected + /// + /// # Example + /// + /// ``` + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// assert_eq!(tx.send_opt(1i), Ok(())); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert_eq!(tx.send_opt(1i), Err(1)); + /// ``` + #[unstable = "this function may be renamed to send() in the future"] + pub fn send_opt(&self, t: T) -> Result<(), T> { + let (new_inner, ret) = match *unsafe { self.inner() } { + Oneshot(ref p) => { + unsafe { + let p = p.get(); + if !(*p).sent() { + return (*p).send(t); + } else { + let a = Arc::new(UnsafeCell::new(stream::Packet::new())); + match (*p).upgrade(Receiver::new(Stream(a.clone()))) { + oneshot::UpSuccess => { + let ret = (*a.get()).send(t); + (a, ret) + } + oneshot::UpDisconnected => (a, Err(t)), + oneshot::UpWoke(task) => { + // This send cannot panic because the task is + // asleep (we're looking at it), so the receiver + // can't go away. + (*a.get()).send(t).ok().unwrap(); + task.wake().map(|t| t.reawaken()); + (a, Ok(())) + } + } + } + } + } + Stream(ref p) => return unsafe { (*p.get()).send(t) }, + Shared(ref p) => return unsafe { (*p.get()).send(t) }, + Sync(..) => unreachable!(), + }; + + unsafe { + let tmp = Sender::new(Stream(new_inner)); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + return ret; + } +} + +#[unstable] +impl<T: Send> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + let (packet, sleeper) = match *unsafe { self.inner() } { + Oneshot(ref p) => { + let a = Arc::new(UnsafeCell::new(shared::Packet::new())); + unsafe { + (*a.get()).postinit_lock(); + match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { + oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), + oneshot::UpWoke(task) => (a, Some(task)) + } + } + } + Stream(ref p) => { + let a = Arc::new(UnsafeCell::new(shared::Packet::new())); + unsafe { + (*a.get()).postinit_lock(); + match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { + stream::UpSuccess | stream::UpDisconnected => (a, None), + stream::UpWoke(task) => (a, Some(task)), + } + } + } + Shared(ref p) => { + unsafe { (*p.get()).clone_chan(); } + return Sender::new(Shared(p.clone())); + } + Sync(..) => unreachable!(), + }; + + unsafe { + (*packet.get()).inherit_blocker(sleeper); + + let tmp = Sender::new(Shared(packet.clone())); + mem::swap(self.inner_mut(), tmp.inner_mut()); + } + Sender::new(Shared(packet)) + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Sender<T> { + fn drop(&mut self) { + match *unsafe { self.inner_mut() } { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Sync(..) => unreachable!(), + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// SyncSender +//////////////////////////////////////////////////////////////////////////////// + +impl<T: Send> SyncSender<T> { + fn new(inner: Arc<UnsafeCell<sync::Packet<T>>>) -> SyncSender<T> { + SyncSender { inner: inner, _marker: marker::NoSync } + } + + /// Sends a value on this synchronous channel. + /// + /// This function will *block* until space in the internal buffer becomes + /// available or a receiver is available to hand off the message to. + /// + /// Note that a successful send does *not* guarantee that the receiver will + /// ever see the data if there is a buffer on this channel. Messages may be + /// enqueued in the internal buffer for the receiver to receive at a later + /// time. If the buffer size is 0, however, it can be guaranteed that the + /// receiver has indeed received the data if this function returns success. + /// + /// # Panics + /// + /// Similarly to `Sender::send`, this function will panic if the + /// corresponding `Receiver` for this channel has disconnected. This + /// behavior is used to propagate panics among tasks. + /// + /// If a panic is not desired, you can achieve the same semantics with the + /// `SyncSender::send_opt` method which will not panic if the receiver + /// disconnects. + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn send(&self, t: T) { + if self.send_opt(t).is_err() { + panic!("sending on a closed channel"); + } + } + + /// Send a value on a channel, returning it back if the receiver + /// disconnected + /// + /// This method will *block* to send the value `t` on the channel, but if + /// the value could not be sent due to the receiver disconnecting, the value + /// is returned back to the callee. This function is similar to `try_send`, + /// except that it will block if the channel is currently full. + /// + /// # Panics + /// + /// This function cannot panic. + #[unstable = "this function may be renamed to send() in the future"] + pub fn send_opt(&self, t: T) -> Result<(), T> { + unsafe { (*self.inner.get()).send(t) } + } + + /// Attempts to send a value on this channel without blocking. + /// + /// This method differs from `send_opt` by returning immediately if the + /// channel's buffer is full or no receiver is waiting to acquire some + /// data. Compared with `send_opt`, this function has two failure cases + /// instead of one (one for disconnection, one for a full buffer). + /// + /// See `SyncSender::send` for notes about guarantees of whether the + /// receiver has received the data or not if this function is successful. + /// + /// # Panics + /// + /// This function cannot panic + #[unstable = "the return type of this function is candidate for \ + modification"] + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + unsafe { (*self.inner.get()).try_send(t) } + } +} + +#[unstable] +impl<T: Send> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + unsafe { (*self.inner.get()).clone_chan(); } + return SyncSender::new(self.inner.clone()); + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for SyncSender<T> { + fn drop(&mut self) { + unsafe { (*self.inner.get()).drop_chan(); } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Receiver +//////////////////////////////////////////////////////////////////////////////// + +impl<T: Send> Receiver<T> { + fn new(inner: Flavor<T>) -> Receiver<T> { + Receiver { inner: UnsafeCell::new(inner), _marker: marker::NoSync } + } + + /// Blocks waiting for a value on this receiver + /// + /// This function will block if necessary to wait for a corresponding send + /// on the channel from its paired `Sender` structure. This receiver will + /// be woken up when data is ready, and the data will be returned. + /// + /// # Panics + /// + /// Similar to channels, this method will trigger a task panic if the + /// other end of the channel has hung up (been deallocated). The purpose of + /// this is to propagate panicks among tasks. + /// + /// If a panic is not desired, then there are two options: + /// + /// * If blocking is still desired, the `recv_opt` method will return `None` + /// when the other end hangs up + /// + /// * If blocking is not desired, then the `try_recv` method will attempt to + /// peek at a value on this receiver. + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn recv(&self) -> T { + match self.recv_opt() { + Ok(t) => t, + Err(()) => panic!("receiving on a closed channel"), + } + } + + /// Attempts to return a pending value on this receiver without blocking + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// # Panics + /// + /// This function cannot panic. + #[unstable = "the return type of this function may be altered"] + pub fn try_recv(&self) -> Result<T, TryRecvError> { + loop { + let new_port = match *unsafe { self.inner() } { + Oneshot(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return Err(Empty), + Err(oneshot::Disconnected) => return Err(Disconnected), + Err(oneshot::Upgraded(rx)) => rx, + } + } + Stream(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(stream::Empty) => return Err(Empty), + Err(stream::Disconnected) => return Err(Disconnected), + Err(stream::Upgraded(rx)) => rx, + } + } + Shared(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(shared::Empty) => return Err(Empty), + Err(shared::Disconnected) => return Err(Disconnected), + } + } + Sync(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Ok(t), + Err(sync::Empty) => return Err(Empty), + Err(sync::Disconnected) => return Err(Disconnected), + } + } + }; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + + /// Attempt to wait for a value on this receiver, but does not panic if the + /// corresponding channel has hung up. + /// + /// This implementation of iterators for ports will always block if there is + /// not data available on the receiver, but it will not panic in the case + /// that the channel has been deallocated. + /// + /// In other words, this function has the same semantics as the `recv` + /// method except for the panic aspect. + /// + /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of + /// the value found on the receiver is returned. + #[unstable = "this function may be renamed to recv()"] + pub fn recv_opt(&self) -> Result<T, ()> { + loop { + let new_port = match *unsafe { self.inner() } { + Oneshot(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(oneshot::Empty) => return unreachable!(), + Err(oneshot::Disconnected) => return Err(()), + Err(oneshot::Upgraded(rx)) => rx, + } + } + Stream(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(stream::Empty) => return unreachable!(), + Err(stream::Disconnected) => return Err(()), + Err(stream::Upgraded(rx)) => rx, + } + } + Shared(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Ok(t), + Err(shared::Empty) => return unreachable!(), + Err(shared::Disconnected) => return Err(()), + } + } + Sync(ref p) => return unsafe { (*p.get()).recv() } + }; + unsafe { + mem::swap(self.inner_mut(), new_port.inner_mut()); + } + } + } + + /// Returns an iterator which will block waiting for messages, but never + /// `panic!`. It will return `None` when the channel has hung up. + #[unstable] + pub fn iter<'a>(&'a self) -> Messages<'a, T> { + Messages { rx: self } + } +} + +impl<T: Send> select::Packet for Receiver<T> { + fn can_recv(&self) -> bool { + loop { + let new_port = match *unsafe { self.inner() } { + Oneshot(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Stream(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Shared(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + Sync(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + }; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + + fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{ + loop { + let (t, new_port) = match *unsafe { self.inner() } { + Oneshot(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + oneshot::SelSuccess => return Ok(()), + oneshot::SelCanceled(task) => return Err(task), + oneshot::SelUpgraded(t, rx) => (t, rx), + } + } + Stream(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + stream::SelSuccess => return Ok(()), + stream::SelCanceled(task) => return Err(task), + stream::SelUpgraded(t, rx) => (t, rx), + } + } + Shared(ref p) => { + return unsafe { (*p.get()).start_selection(task) }; + } + Sync(ref p) => { + return unsafe { (*p.get()).start_selection(task) }; + } + }; + task = t; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } + + fn abort_selection(&self) -> bool { + let mut was_upgrade = false; + loop { + let result = match *unsafe { self.inner() } { + Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, + Stream(ref p) => unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Shared(ref p) => return unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Sync(ref p) => return unsafe { + (*p.get()).abort_selection() + }, + }; + let new_port = match result { Ok(b) => return b, Err(p) => p }; + was_upgrade = true; + unsafe { + mem::swap(self.inner_mut(), + new_port.inner_mut()); + } + } + } +} + +#[unstable] +impl<'a, T: Send> Iterator<T> for Messages<'a, T> { + fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Receiver<T> { + fn drop(&mut self) { + match *unsafe { self.inner_mut() } { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use os; + use super::*; + + pub fn stress_factor() -> uint { + match os::getenv("RUST_TEST_STRESS") { + Some(val) => from_str::<uint>(val.as_slice()).unwrap(), + None => 1, + } + } + + test!(fn smoke() { + let (tx, rx) = channel::<int>(); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn drop_full() { + let (tx, _rx) = channel(); + tx.send(box 1i); + }) + + test!(fn drop_full_shared() { + let (tx, _rx) = channel(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(box 1i); + }) + + test!(fn smoke_shared() { + let (tx, rx) = channel::<int>(); + tx.send(1); + assert_eq!(rx.recv(), 1); + let tx = tx.clone(); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_threads() { + let (tx, rx) = channel::<int>(); + spawn(proc() { + tx.send(1); + }); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_port_gone() { + let (tx, rx) = channel::<int>(); + drop(rx); + tx.send(1); + } #[should_fail]) + + test!(fn smoke_shared_port_gone() { + let (tx, rx) = channel::<int>(); + drop(rx); + tx.send(1); + } #[should_fail]) + + test!(fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<int>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + tx2.send(1); + } #[should_fail]) + + test!(fn port_gone_concurrent() { + let (tx, rx) = channel::<int>(); + spawn(proc() { + rx.recv(); + }); + loop { tx.send(1) } + } #[should_fail]) + + test!(fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<int>(); + let tx2 = tx.clone(); + spawn(proc() { + rx.recv(); + }); + loop { + tx.send(1); + tx2.send(1); + } + } #[should_fail]) + + test!(fn smoke_chan_gone() { + let (tx, rx) = channel::<int>(); + drop(tx); + rx.recv(); + } #[should_fail]) + + test!(fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + rx.recv(); + } #[should_fail]) + + test!(fn chan_gone_concurrent() { + let (tx, rx) = channel::<int>(); + spawn(proc() { + tx.send(1); + tx.send(1); + }); + loop { rx.recv(); } + } #[should_fail]) + + test!(fn stress() { + let (tx, rx) = channel::<int>(); + spawn(proc() { + for _ in range(0u, 10000) { tx.send(1i); } + }); + for _ in range(0u, 10000) { + assert_eq!(rx.recv(), 1); + } + }) + + test!(fn stress_shared() { + static AMT: uint = 10000; + static NTHREADS: uint = 8; + let (tx, rx) = channel::<int>(); + let (dtx, drx) = channel::<()>(); + + spawn(proc() { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(rx.recv(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()); + }); + + for _ in range(0, NTHREADS) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, AMT) { tx.send(1); } + }); + } + drop(tx); + drx.recv(); + }) + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); + spawn(proc() { + tx1.send(()); + for _ in range(0i, 40) { + assert_eq!(rx2.recv(), 1); + } + tx3.send(()); + }); + rx1.recv(); + spawn(proc() { + for _ in range(0i, 40) { + tx2.send(1); + } + tx4.send(()); + }); + rx3.recv(); + rx3.recv(); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::<int>(); + let (dtx, drx) = channel(); + spawn(proc() { + for _ in range(0i, 40) { + assert_eq!(rx.recv(), 1); + } + dtx.send(()); + }); + for _ in range(0u, 40) { + tx.send(1); + } + drx.recv(); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); + spawn(proc() { + assert_eq!(rx1.recv(), 1); + tx2.send(2); + tx4.send(()); + }); + spawn(proc() { + tx1.send(1); + assert_eq!(rx2.recv(), 2); + tx3.send(()); + }); + rx3.recv(); + rx3.recv(); + } + + test!(fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<int>(); + drop(rx); + }) + + test!(fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<int>(); + drop(tx); + }) + + test!(fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<int>>(); + drop(rx); + tx.send(box 0); + } #[should_fail]) + + test!(fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = task::try(proc() { + let (tx, rx) = channel::<int>(); + drop(tx); + rx.recv(); + }); + // What is our res? + assert!(res.is_err()); + }) + + test!(fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<int>>(); + tx.send(box 10); + assert!(rx.recv() == box 10); + }) + + test!(fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<int>(); + assert!(tx.send_opt(10).is_ok()); + assert!(rx.recv() == 10); + }) + + test!(fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<int>(); + drop(rx); + assert!(tx.send_opt(10).is_err()); + }) + + test!(fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<int>(); + tx.send(10); + assert!(rx.recv_opt() == Ok(10)); + }) + + test!(fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<int>(); + drop(tx); + assert!(rx.recv_opt() == Err(())); + }) + + test!(fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<int>(); + assert_eq!(rx.try_recv(), Err(Empty)) + tx.send(10); + assert_eq!(rx.try_recv(), Ok(10)); + }) + + test!(fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<int>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); + }) + + test!(fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<int>(); + assert_eq!(rx.try_recv(), Err(Empty)); + }) + + test!(fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<int>>(); + spawn(proc() { + assert!(rx.recv() == box 10); + }); + + tx.send(box 10); + }) + + test!(fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<int>>(); + spawn(proc() { + drop(tx); + }); + let res = task::try(proc() { + assert!(rx.recv() == box 10); + }); + assert!(res.is_err()); + }) + + test!(fn oneshot_multi_thread_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::<int>(); + spawn(proc() { + drop(rx); + }); + drop(tx); + } + }) + + test!(fn oneshot_multi_thread_send_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::<int>(); + spawn(proc() { + drop(rx); + }); + let _ = task::try(proc() { + tx.send(1); + }); + } + }) + + test!(fn oneshot_multi_thread_recv_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel::<int>(); + spawn(proc() { + let res = task::try(proc() { + rx.recv(); + }); + assert!(res.is_err()); + }); + spawn(proc() { + spawn(proc() { + drop(tx); + }); + }); + } + }) + + test!(fn oneshot_multi_thread_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel(); + spawn(proc() { + tx.send(box 10i); + }); + spawn(proc() { + assert!(rx.recv() == box 10i); + }); + } + }) + + test!(fn stream_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender<Box<int>>, i: int) { + if i == 10 { return } + + spawn(proc() { + tx.send(box i); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<int>>, i: int) { + if i == 10 { return } + + spawn(proc() { + assert!(rx.recv() == box i); + recv(rx, i + 1); + }); + } + } + }) + + test!(fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in range(0i, 10000) { tx.send(()); } + for _ in range(0i, 10000) { rx.recv(); } + }) + + test!(fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in range(0, total) { + let tx = tx.clone(); + spawn(proc() { + tx.send(()); + }); + } + + for _ in range(0, total) { + rx.recv(); + } + }) + + test!(fn test_nested_recv_iter() { + let (tx, rx) = channel::<int>(); + let (total_tx, total_rx) = channel::<int>(); + + spawn(proc() { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc); + }); + + tx.send(3); + tx.send(1); + tx.send(2); + drop(tx); + assert_eq!(total_rx.recv(), 6); + }) + + test!(fn test_recv_iter_break() { + let (tx, rx) = channel::<int>(); + let (count_tx, count_rx) = channel(); + + spawn(proc() { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count); + }); + + tx.send(2); + tx.send(2); + tx.send(2); + let _ = tx.send_opt(2); + drop(tx); + assert_eq!(count_rx.recv(), 4); + }) + + test!(fn try_recv_states() { + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + spawn(proc() { + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); + drop(tx1); + tx3.send(()); + }); + + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Err(Disconnected)); + }) + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + test!(fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + spawn(proc() { + rx.recv(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()); + }); + // make sure the other task has gone to sleep + for _ in range(0u, 5000) { task::deschedule(); } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()); + + // wait for the child task to exit before we exit + rx2.recv(); + }) + + test!(fn sends_off_the_runtime() { + use rustrt::thread::Thread; + + let (tx, rx) = channel(); + let t = Thread::start(proc() { + for _ in range(0u, 1000) { + tx.send(()); + } + }); + for _ in range(0u, 1000) { + rx.recv(); + } + t.join(); + }) + + test!(fn try_recvs_off_the_runtime() { + use rustrt::thread::Thread; + + let (tx, rx) = channel(); + let (cdone, pdone) = channel(); + let t = Thread::start(proc() { + let mut hits = 0u; + while hits < 10 { + match rx.try_recv() { + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, + } + } + cdone.send(()); + }); + for _ in range(0u, 10) { + tx.send(()); + } + t.join(); + pdone.recv(); + }) +} + +#[cfg(test)] +mod sync_tests { + use prelude::*; + use os; + + pub fn stress_factor() -> uint { + match os::getenv("RUST_TEST_STRESS") { + Some(val) => from_str::<uint>(val.as_slice()).unwrap(), + None => 1, + } + } + + test!(fn smoke() { + let (tx, rx) = sync_channel::<int>(1); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn drop_full() { + let (tx, _rx) = sync_channel(1); + tx.send(box 1i); + }) + + test!(fn smoke_shared() { + let (tx, rx) = sync_channel::<int>(1); + tx.send(1); + assert_eq!(rx.recv(), 1); + let tx = tx.clone(); + tx.send(1); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_threads() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + tx.send(1); + }); + assert_eq!(rx.recv(), 1); + }) + + test!(fn smoke_port_gone() { + let (tx, rx) = sync_channel::<int>(0); + drop(rx); + tx.send(1); + } #[should_fail]) + + test!(fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<int>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + tx2.send(1); + } #[should_fail]) + + test!(fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + rx.recv(); + }); + loop { tx.send(1) } + } #[should_fail]) + + test!(fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<int>(0); + let tx2 = tx.clone(); + spawn(proc() { + rx.recv(); + }); + loop { + tx.send(1); + tx2.send(1); + } + } #[should_fail]) + + test!(fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + rx.recv(); + } #[should_fail]) + + test!(fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + rx.recv(); + } #[should_fail]) + + test!(fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + tx.send(1); + tx.send(1); + }); + loop { rx.recv(); } + } #[should_fail]) + + test!(fn stress() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + for _ in range(0u, 10000) { tx.send(1); } + }); + for _ in range(0u, 10000) { + assert_eq!(rx.recv(), 1); + } + }) + + test!(fn stress_shared() { + static AMT: uint = 1000; + static NTHREADS: uint = 8; + let (tx, rx) = sync_channel::<int>(0); + let (dtx, drx) = sync_channel::<()>(0); + + spawn(proc() { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(rx.recv(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + dtx.send(()); + }); + + for _ in range(0, NTHREADS) { + let tx = tx.clone(); + spawn(proc() { + for _ in range(0, AMT) { tx.send(1); } + }); + } + drop(tx); + drx.recv(); + }) + + test!(fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<int>(0); + drop(rx); + }) + + test!(fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<int>(0); + drop(tx); + }) + + test!(fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<int>>(0); + drop(rx); + tx.send(box 0); + } #[should_fail]) + + test!(fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = task::try(proc() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + rx.recv(); + }); + // What is our res? + assert!(res.is_err()); + }) + + test!(fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<int>>(1); + tx.send(box 10); + assert!(rx.recv() == box 10); + }) + + test!(fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv() == 10); + }) + + test!(fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<int>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); + }) + + test!(fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<int>(0); + assert_eq!(tx.try_send(10), Err(Full(10))); + }) + + test!(fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<int>(1); + tx.send(10); + assert!(rx.recv_opt() == Ok(10)); + }) + + test!(fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + assert!(rx.recv_opt() == Err(())); + }) + + test!(fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(rx.try_recv(), Err(Empty)) + tx.send(10); + assert_eq!(rx.try_recv(), Ok(10)); + }) + + test!(fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<int>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); + }) + + test!(fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<int>(0); + assert_eq!(rx.try_recv(), Err(Empty)); + }) + + test!(fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<int>>(0); + spawn(proc() { + assert!(rx.recv() == box 10); + }); + + tx.send(box 10); + }) + + test!(fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<int>>(0); + spawn(proc() { + drop(tx); + }); + let res = task::try(proc() { + assert!(rx.recv() == box 10); + }); + assert!(res.is_err()); + }) + + test!(fn oneshot_multi_thread_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + drop(rx); + }); + drop(tx); + } + }) + + test!(fn oneshot_multi_thread_send_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + drop(rx); + }); + let _ = task::try(proc() { + tx.send(1); + }); + } + }) + + test!(fn oneshot_multi_thread_recv_close_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + let res = task::try(proc() { + rx.recv(); + }); + assert!(res.is_err()); + }); + spawn(proc() { + spawn(proc() { + drop(tx); + }); + }); + } + }) + + test!(fn oneshot_multi_thread_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<Box<int>>(0); + spawn(proc() { + tx.send(box 10i); + }); + spawn(proc() { + assert!(rx.recv() == box 10i); + }); + } + }) + + test!(fn stream_send_recv_stress() { + for _ in range(0, stress_factor()) { + let (tx, rx) = sync_channel::<Box<int>>(0); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: SyncSender<Box<int>>, i: int) { + if i == 10 { return } + + spawn(proc() { + tx.send(box i); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<int>>, i: int) { + if i == 10 { return } + + spawn(proc() { + assert!(rx.recv() == box i); + recv(rx, i + 1); + }); + } + } + }) + + test!(fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(10000); + for _ in range(0u, 10000) { tx.send(()); } + for _ in range(0u, 10000) { rx.recv(); } + }) + + test!(fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + for _ in range(0, total) { + let tx = tx.clone(); + spawn(proc() { + tx.send(()); + }); + } + + for _ in range(0, total) { + rx.recv(); + } + }) + + test!(fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<int>(0); + let (total_tx, total_rx) = sync_channel::<int>(0); + + spawn(proc() { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc); + }); + + tx.send(3); + tx.send(1); + tx.send(2); + drop(tx); + assert_eq!(total_rx.recv(), 6); + }) + + test!(fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<int>(0); + let (count_tx, count_rx) = sync_channel(0); + + spawn(proc() { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count); + }); + + tx.send(2); + tx.send(2); + tx.send(2); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv(), 4); + }) + + test!(fn try_recv_states() { + let (tx1, rx1) = sync_channel::<int>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + spawn(proc() { + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); + drop(tx1); + tx3.send(()); + }); + + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Err(Disconnected)); + }) + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + test!(fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + spawn(proc() { + rx.recv(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()); + }); + // make sure the other task has gone to sleep + for _ in range(0u, 5000) { task::deschedule(); } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()); + + // wait for the child task to exit before we exit + rx2.recv(); + }) + + test!(fn try_recvs_off_the_runtime() { + use rustrt::thread::Thread; + + let (tx, rx) = sync_channel::<()>(0); + let (cdone, pdone) = channel(); + let t = Thread::start(proc() { + let mut hits = 0u; + while hits < 10 { + match rx.try_recv() { + Ok(()) => { hits += 1; } + Err(Empty) => { Thread::yield_now(); } + Err(Disconnected) => return, + } + } + cdone.send(()); + }); + for _ in range(0u, 10) { + tx.send(()); + } + t.join(); + pdone.recv(); + }) + + test!(fn send_opt1() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { rx.recv(); }); + assert_eq!(tx.send_opt(1), Ok(())); + }) + + test!(fn send_opt2() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { drop(rx); }); + assert_eq!(tx.send_opt(1), Err(1)); + }) + + test!(fn send_opt3() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(tx.send_opt(1), Ok(())); + spawn(proc() { drop(rx); }); + assert_eq!(tx.send_opt(1), Err(1)); + }) + + test!(fn send_opt4() { + let (tx, rx) = sync_channel::<int>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + spawn(proc() { + assert_eq!(tx.send_opt(1), Err(1)); + done.send(()); + }); + spawn(proc() { + assert_eq!(tx2.send_opt(2), Err(2)); + done2.send(()); + }); + drop(rx); + donerx.recv(); + donerx.recv(); + }) + + test!(fn try_send1() { + let (tx, _rx) = sync_channel::<int>(0); + assert_eq!(tx.try_send(1), Err(Full(1))); + }) + + test!(fn try_send2() { + let (tx, _rx) = sync_channel::<int>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(Full(1))); + }) + + test!(fn try_send3() { + let (tx, rx) = sync_channel::<int>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); + }) + + test!(fn try_send4() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + for _ in range(0u, 1000) { task::deschedule(); } + assert_eq!(tx.try_send(1), Ok(())); + }); + assert_eq!(rx.recv(), 1); + } #[ignore(reason = "flaky on libnative")]) + + test!(fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + spawn(proc() { + rx1.recv(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv(); + } + + for _ in range(0u, 100) { + repro() + } + }) +} diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs new file mode 100644 index 00000000000..bc34c3e8c52 --- /dev/null +++ b/src/libstd/comm/oneshot.rs @@ -0,0 +1,377 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an Arc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic uint variable. This variable +/// indicates both the state of the port/chan but also contains any tasks +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. + +pub use self::Failure::*; +pub use self::UpgradeResult::*; +pub use self::SelectionResult::*; +use self::MyUpgrade::*; + +use core::prelude::*; + +use alloc::boxed::Box; +use core::mem; +use rustrt::local::Local; +use rustrt::task::{Task, BlockedTask}; + +use sync::atomic; +use comm::Receiver; + +// Various states you can find a port in. +const EMPTY: uint = 0; +const DATA: uint = 1; +const DISCONNECTED: uint = 2; + +pub struct Packet<T> { + // Internal state of the chan/port pair (stores the blocked task as well) + state: atomic::AtomicUint, + // One-shot data slot location + data: Option<T>, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: MyUpgrade<T>, +} + +pub enum Failure<T> { + Empty, + Disconnected, + Upgraded(Receiver<T>), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult<T> { + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Receiver<T>), + SelSuccess, +} + +enum MyUpgrade<T> { + NothingSent, + SendUsed, + GoUp(Receiver<T>), +} + +impl<T: Send> Packet<T> { + pub fn new() -> Packet<T> { + Packet { + data: None, + upgrade: NothingSent, + state: atomic::AtomicUint::new(EMPTY), + } + } + + pub fn send(&mut self, t: T) -> Result<(), T> { + // Sanity check + match self.upgrade { + NothingSent => {} + _ => panic!("sending on a oneshot that's already sent on "), + } + assert!(self.data.is_none()); + self.data = Some(t); + self.upgrade = SendUsed; + + match self.state.swap(DATA, atomic::SeqCst) { + // Sent the data, no one was waiting + EMPTY => Ok(()), + + // Couldn't send the data, the port hung up first. Return the data + // back up the stack. + DISCONNECTED => { + Err(self.data.take().unwrap()) + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // Anything else means that there was a task waiting on the other + // end. We leave the 'DATA' state inside so it'll pick it up on the + // other end. + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + Ok(()) + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + match self.upgrade { + NothingSent => false, + _ => true, + } + } + + pub fn recv(&mut self) -> Result<T, Failure<T>> { + // Attempt to not block the task (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + if self.state.load(atomic::SeqCst) == EMPTY { + let t: Box<Task> = Local::take(); + t.deschedule(1, |task| { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) { + // Nothing on the channel, we legitimately block + EMPTY => Ok(()), + + // If there's data or it's a disconnected channel, then we + // failed the cmpxchg, so we just wake ourselves back up + DATA | DISCONNECTED => { + unsafe { Err(BlockedTask::cast_from_uint(n)) } + } + + // Only one thread is allowed to sleep on this port + _ => unreachable!() + } + }); + } + + self.try_recv() + } + + pub fn try_recv(&mut self) -> Result<T, Failure<T>> { + match self.state.load(atomic::SeqCst) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, atomic::SeqCst); + match self.data.take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => { + match self.data.take() { + Some(data) => Ok(data), + None => { + match mem::replace(&mut self.upgrade, SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)) + } + } + } + } + _ => unreachable!() + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { + let prev = match self.upgrade { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => panic!("upgrading again"), + }; + self.upgrade = GoUp(up); + + match self.state.swap(DISCONNECTED, atomic::SeqCst) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { self.upgrade = prev; UpDisconnected } + + // If someone's waiting, we gotta wake them up + n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + + pub fn drop_chan(&mut self) { + match self.state.swap(DISCONNECTED, atomic::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + } + } + } + + pub fn drop_port(&mut self) { + match self.state.swap(DISCONNECTED, atomic::SeqCst) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => { self.data.take().unwrap(); } + + // We're the only ones that can block on this port + _ => unreachable!() + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { + match self.state.load(atomic::SeqCst) { + EMPTY => Ok(false), // Welp, we tried + DATA => Ok(true), // we have some un-acquired data + DISCONNECTED if self.data.is_some() => Ok(true), // we have data + DISCONNECTED => { + match mem::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => Err(upgrade), + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { self.upgrade = up; Ok(true) } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Attempts to start selection on this port. This can either succeed, fail + // because there is data, or fail because there is an upgrade pending. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomic::SeqCst) { + EMPTY => SelSuccess, + DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }), + DISCONNECTED if self.data.is_some() => { + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + DISCONNECTED => { + match mem::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => { + SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) }, + upgrade) + } + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { + self.upgrade = up; + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { + let state = match self.state.load(atomic::SeqCst) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ EMPTY | + s @ DATA | + s @ DISCONNECTED => s, + + // If we've got a blocked task, then use an atomic to gain ownership + // of it (may fail) + n => self.state.compare_and_swap(n, EMPTY, atomic::SeqCst) + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our task used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. First, check if there was data waiting for us. This + // is possible if the other end sent something and then hung up. + // + // We then need to check to see if there was an upgrade requested, + // and if so, the upgraded port needs to have its selection aborted. + DISCONNECTED => { + if self.data.is_some() { + Ok(true) + } else { + match mem::replace(&mut self.upgrade, SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + } + + // We woke ourselves up from select. Assert that the task should be + // trashed and returned that we don't have any data. + n => { + let t = unsafe { BlockedTask::cast_from_uint(n) }; + t.trash(); + Ok(false) + } + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Packet<T> { + fn drop(&mut self) { + assert_eq!(self.state.load(atomic::SeqCst), DISCONNECTED); + } +} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs new file mode 100644 index 00000000000..621556f75ce --- /dev/null +++ b/src/libstd/comm/select.rs @@ -0,0 +1,711 @@ +// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Selection over an array of receivers +//! +//! This module contains the implementation machinery necessary for selecting +//! over a number of receivers. One large goal of this module is to provide an +//! efficient interface to selecting over any receiver of any type. +//! +//! This is achieved through an architecture of a "receiver set" in which +//! receivers are added to a set and then the entire set is waited on at once. +//! The set can be waited on multiple times to prevent re-adding each receiver +//! to the set. +//! +//! Usage of this module is currently encouraged to go through the use of the +//! `select!` macro. This macro allows naturally binding of variables to the +//! received values of receivers in a much more natural syntax then usage of the +//! `Select` structure directly. +//! +//! # Example +//! +//! ```rust +//! let (tx1, rx1) = channel(); +//! let (tx2, rx2) = channel(); +//! +//! tx1.send(1i); +//! tx2.send(2i); +//! +//! select! { +//! val = rx1.recv() => { +//! assert_eq!(val, 1i); +//! }, +//! val = rx2.recv() => { +//! assert_eq!(val, 2i); +//! } +//! } +//! ``` + +#![allow(dead_code)] +#![experimental = "This implementation, while likely sufficient, is unsafe and \ + likely to be error prone. At some point in the future this \ + module will likely be replaced, and it is currently \ + unknown how much API breakage that will cause. The ability \ + to select over a number of channels will remain forever, \ + but no guarantees beyond this are being made"] + + +use core::prelude::*; + +use alloc::boxed::Box; +use core::cell::Cell; +use core::kinds::marker; +use core::mem; +use core::uint; +use rustrt::local::Local; +use rustrt::task::{Task, BlockedTask}; + +use comm::Receiver; + +/// The "receiver set" of the select interface. This structure is used to manage +/// a set of receivers which are being selected over. +pub struct Select { + head: *mut Handle<'static, ()>, + tail: *mut Handle<'static, ()>, + next_id: Cell<uint>, + marker1: marker::NoSend, +} + +/// A handle to a receiver which is currently a member of a `Select` set of +/// receivers. This handle is used to keep the receiver in the set as well as +/// interact with the underlying receiver. +pub struct Handle<'rx, T:'rx> { + /// The ID of this handle, used to compare against the return value of + /// `Select::wait()` + id: uint, + selector: &'rx Select, + next: *mut Handle<'static, ()>, + prev: *mut Handle<'static, ()>, + added: bool, + packet: &'rx Packet+'rx, + + // due to our fun transmutes, we be sure to place this at the end. (nothing + // previous relies on T) + rx: &'rx Receiver<T>, +} + +struct Packets { cur: *mut Handle<'static, ()> } + +#[doc(hidden)] +pub trait Packet { + fn can_recv(&self) -> bool; + fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>; + fn abort_selection(&self) -> bool; +} + +impl Select { + /// Creates a new selection structure. This set is initially empty and + /// `wait` will panic!() if called. + /// + /// Usage of this struct directly can sometimes be burdensome, and usage is + /// rather much easier through the `select!` macro. + pub fn new() -> Select { + Select { + marker1: marker::NoSend, + head: 0 as *mut Handle<'static, ()>, + tail: 0 as *mut Handle<'static, ()>, + next_id: Cell::new(1), + } + } + + /// Creates a new handle into this receiver set for a new receiver. Note + /// that this does *not* add the receiver to the receiver set, for that you + /// must call the `add` method on the handle itself. + pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> { + let id = self.next_id.get(); + self.next_id.set(id + 1); + Handle { + id: id, + selector: self, + next: 0 as *mut Handle<'static, ()>, + prev: 0 as *mut Handle<'static, ()>, + added: false, + rx: rx, + packet: rx, + } + } + + /// Waits for an event on this receiver set. The returned value is *not* an + /// index, but rather an id. This id can be queried against any active + /// `Handle` structures (each one has an `id` method). The handle with + /// the matching `id` will have some sort of event available on it. The + /// event could either be that data is available or the corresponding + /// channel has been closed. + pub fn wait(&self) -> uint { + self.wait2(true) + } + + /// Helper method for skipping the preflight checks during testing + fn wait2(&self, do_preflight_checks: bool) -> uint { + // Note that this is currently an inefficient implementation. We in + // theory have knowledge about all receivers in the set ahead of time, + // so this method shouldn't really have to iterate over all of them yet + // again. The idea with this "receiver set" interface is to get the + // interface right this time around, and later this implementation can + // be optimized. + // + // This implementation can be summarized by: + // + // fn select(receivers) { + // if any receiver ready { return ready index } + // deschedule { + // block on all receivers + // } + // unblock on all receivers + // return ready index + // } + // + // Most notably, the iterations over all of the receivers shouldn't be + // necessary. + unsafe { + let mut amt = 0; + for p in self.iter() { + amt += 1; + if do_preflight_checks && (*p).packet.can_recv() { + return (*p).id; + } + } + assert!(amt > 0); + + let mut ready_index = amt; + let mut ready_id = uint::MAX; + let mut iter = self.iter().enumerate(); + + // Acquire a number of blocking contexts, and block on each one + // sequentially until one fails. If one fails, then abort + // immediately so we can go unblock on all the other receivers. + let task: Box<Task> = Local::take(); + task.deschedule(amt, |task| { + // Prepare for the block + let (i, handle) = iter.next().unwrap(); + match (*handle).packet.start_selection(task) { + Ok(()) => Ok(()), + Err(task) => { + ready_index = i; + ready_id = (*handle).id; + Err(task) + } + } + }); + + // Abort the selection process on each receiver. If the abort + // process returns `true`, then that means that the receiver is + // ready to receive some data. Note that this also means that the + // receiver may have yet to have fully read the `to_wake` field and + // woken us up (although the wakeup is guaranteed to fail). + // + // This situation happens in the window of where a sender invokes + // increment(), sees -1, and then decides to wake up the task. After + // all this is done, the sending thread will set `selecting` to + // `false`. Until this is done, we cannot return. If we were to + // return, then a sender could wake up a receiver which has gone + // back to sleep after this call to `select`. + // + // Note that it is a "fairly small window" in which an increment() + // views that it should wake a thread up until the `selecting` bit + // is set to false. For now, the implementation currently just spins + // in a yield loop. This is very distasteful, but this + // implementation is already nowhere near what it should ideally be. + // A rewrite should focus on avoiding a yield loop, and for now this + // implementation is tying us over to a more efficient "don't + // iterate over everything every time" implementation. + for handle in self.iter().take(ready_index) { + if (*handle).packet.abort_selection() { + ready_id = (*handle).id; + } + } + + assert!(ready_id != uint::MAX); + return ready_id; + } + } + + fn iter(&self) -> Packets { Packets { cur: self.head } } +} + +impl<'rx, T: Send> Handle<'rx, T> { + /// Retrieve the id of this handle. + #[inline] + pub fn id(&self) -> uint { self.id } + + /// Receive a value on the underlying receiver. Has the same semantics as + /// `Receiver.recv` + pub fn recv(&mut self) -> T { self.rx.recv() } + /// Block to receive a value on the underlying receiver, returning `Some` on + /// success or `None` if the channel disconnects. This function has the same + /// semantics as `Receiver.recv_opt` + pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() } + + /// Adds this handle to the receiver set that the handle was created from. This + /// method can be called multiple times, but it has no effect if `add` was + /// called previously. + /// + /// This method is unsafe because it requires that the `Handle` is not moved + /// while it is added to the `Select` set. + pub unsafe fn add(&mut self) { + if self.added { return } + let selector: &mut Select = mem::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = mem::transmute(&*self); + + if selector.head.is_null() { + selector.head = me; + selector.tail = me; + } else { + (*me).prev = selector.tail; + assert!((*me).next.is_null()); + (*selector.tail).next = me; + selector.tail = me; + } + self.added = true; + } + + /// Removes this handle from the `Select` set. This method is unsafe because + /// it has no guarantee that the `Handle` was not moved since `add` was + /// called. + pub unsafe fn remove(&mut self) { + if !self.added { return } + + let selector: &mut Select = mem::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = mem::transmute(&*self); + + if self.prev.is_null() { + assert_eq!(selector.head, me); + selector.head = self.next; + } else { + (*self.prev).next = self.next; + } + if self.next.is_null() { + assert_eq!(selector.tail, me); + selector.tail = self.prev; + } else { + (*self.next).prev = self.prev; + } + + self.next = 0 as *mut Handle<'static, ()>; + self.prev = 0 as *mut Handle<'static, ()>; + + self.added = false; + } +} + +#[unsafe_destructor] +impl Drop for Select { + fn drop(&mut self) { + assert!(self.head.is_null()); + assert!(self.tail.is_null()); + } +} + +#[unsafe_destructor] +impl<'rx, T: Send> Drop for Handle<'rx, T> { + fn drop(&mut self) { + unsafe { self.remove() } + } +} + +impl Iterator<*mut Handle<'static, ()>> for Packets { + fn next(&mut self) -> Option<*mut Handle<'static, ()>> { + if self.cur.is_null() { + None + } else { + let ret = Some(self.cur); + unsafe { self.cur = (*self.cur).next; } + ret + } + } +} + +#[cfg(test)] +#[allow(unused_imports)] +mod test { + use prelude::*; + + use super::*; + + // Don't use the libstd version so we can pull in the right Select structure + // (std::comm points at the wrong one) + macro_rules! select { + ( + $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ + ) => ({ + use comm::Select; + let sel = Select::new(); + $( let mut $rx = sel.handle(&$rx); )+ + unsafe { + $( $rx.add(); )+ + } + let ret = sel.wait(); + $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ + { unreachable!() } + }) + } + + test!(fn smoke() { + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + tx1.send(1); + select! ( + foo = rx1.recv() => { assert_eq!(foo, 1); }, + _bar = rx2.recv() => { panic!() } + ) + tx2.send(2); + select! ( + _foo = rx1.recv() => { panic!() }, + bar = rx2.recv() => { assert_eq!(bar, 2) } + ) + drop(tx1); + select! ( + foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, + _bar = rx2.recv() => { panic!() } + ) + drop(tx2); + select! ( + bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } + ) + }) + + test!(fn smoke2() { + let (_tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (_tx3, rx3) = channel::<int>(); + let (_tx4, rx4) = channel::<int>(); + let (tx5, rx5) = channel::<int>(); + tx5.send(4); + select! ( + _foo = rx1.recv() => { panic!("1") }, + _foo = rx2.recv() => { panic!("2") }, + _foo = rx3.recv() => { panic!("3") }, + _foo = rx4.recv() => { panic!("4") }, + foo = rx5.recv() => { assert_eq!(foo, 4); } + ) + }) + + test!(fn closed() { + let (_tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + drop(tx2); + + select! ( + _a1 = rx1.recv_opt() => { panic!() }, + a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } + ) + }) + + test!(fn unblocks() { + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<int>(); + + spawn(proc() { + for _ in range(0u, 20) { task::deschedule(); } + tx1.send(1); + rx3.recv(); + for _ in range(0u, 20) { task::deschedule(); } + }); + + select! ( + a = rx1.recv() => { assert_eq!(a, 1); }, + _b = rx2.recv() => { panic!() } + ) + tx3.send(1); + select! ( + a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, + _b = rx2.recv() => { panic!() } + ) + }) + + test!(fn both_ready() { + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + + spawn(proc() { + for _ in range(0u, 20) { task::deschedule(); } + tx1.send(1); + tx2.send(2); + rx3.recv(); + }); + + select! ( + a = rx1.recv() => { assert_eq!(a, 1); }, + a = rx2.recv() => { assert_eq!(a, 2); } + ) + select! ( + a = rx1.recv() => { assert_eq!(a, 1); }, + a = rx2.recv() => { assert_eq!(a, 2); } + ) + assert_eq!(rx1.try_recv(), Err(Empty)); + assert_eq!(rx2.try_recv(), Err(Empty)); + tx3.send(()); + }) + + test!(fn stress() { + static AMT: int = 10000; + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + + spawn(proc() { + for i in range(0, AMT) { + if i % 2 == 0 { + tx1.send(i); + } else { + tx2.send(i); + } + rx3.recv(); + } + }); + + for i in range(0, AMT) { + select! ( + i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); } + ) + tx3.send(()); + } + }) + + test!(fn cloning() { + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + + spawn(proc() { + rx3.recv(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(Empty)); + tx1.send(2); + rx3.recv(); + }); + + tx3.send(()); + select!( + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + ) + tx3.send(()); + }) + + test!(fn cloning2() { + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + + spawn(proc() { + rx3.recv(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(Empty)); + tx1.send(2); + rx3.recv(); + }); + + tx3.send(()); + select!( + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + ) + tx3.send(()); + }) + + test!(fn cloning3() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + spawn(proc() { + let s = Select::new(); + let mut h1 = s.handle(&rx1); + let mut h2 = s.handle(&rx2); + unsafe { h2.add(); } + unsafe { h1.add(); } + assert_eq!(s.wait(), h2.id); + tx3.send(()); + }); + + for _ in range(0u, 1000) { task::deschedule(); } + drop(tx1.clone()); + tx2.send(()); + rx3.recv(); + }) + + test!(fn preflight1() { + let (tx, rx) = channel(); + tx.send(()); + select!( + () = rx.recv() => {} + ) + }) + + test!(fn preflight2() { + let (tx, rx) = channel(); + tx.send(()); + tx.send(()); + select!( + () = rx.recv() => {} + ) + }) + + test!(fn preflight3() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); + select!( + () = rx.recv() => {} + ) + }) + + test!(fn preflight4() { + let (tx, rx) = channel(); + tx.send(()); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight5() { + let (tx, rx) = channel(); + tx.send(()); + tx.send(()); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight6() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight7() { + let (tx, rx) = channel::<()>(); + drop(tx); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight8() { + let (tx, rx) = channel(); + tx.send(()); + drop(tx); + rx.recv(); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn preflight9() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); + drop(tx); + rx.recv(); + let s = Select::new(); + let mut h = s.handle(&rx); + unsafe { h.add(); } + assert_eq!(s.wait2(false), h.id); + }) + + test!(fn oneshot_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + spawn(proc() { + select! { + () = rx1.recv() => {} + } + tx2.send(()); + }); + + for _ in range(0u, 100) { task::deschedule() } + tx1.send(()); + rx2.recv(); + }) + + test!(fn stream_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + tx1.send(()); + tx1.send(()); + rx1.recv(); + rx1.recv(); + spawn(proc() { + select! { + () = rx1.recv() => {} + } + tx2.send(()); + }); + + for _ in range(0u, 100) { task::deschedule() } + tx1.send(()); + rx2.recv(); + }) + + test!(fn shared_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + drop(tx1.clone()); + tx1.send(()); + rx1.recv(); + spawn(proc() { + select! { + () = rx1.recv() => {} + } + tx2.send(()); + }); + + for _ in range(0u, 100) { task::deschedule() } + tx1.send(()); + rx2.recv(); + }) + + test!(fn sync1() { + let (tx, rx) = sync_channel::<int>(1); + tx.send(1); + select! { + n = rx.recv() => { assert_eq!(n, 1); } + } + }) + + test!(fn sync2() { + let (tx, rx) = sync_channel::<int>(0); + spawn(proc() { + for _ in range(0u, 100) { task::deschedule() } + tx.send(1); + }); + select! { + n = rx.recv() => { assert_eq!(n, 1); } + } + }) + + test!(fn sync3() { + let (tx1, rx1) = sync_channel::<int>(0); + let (tx2, rx2): (Sender<int>, Receiver<int>) = channel(); + spawn(proc() { tx1.send(1); }); + spawn(proc() { tx2.send(2); }); + select! { + n = rx1.recv() => { + assert_eq!(n, 1); + assert_eq!(rx2.recv(), 2); + }, + n = rx2.recv() => { + assert_eq!(n, 2); + assert_eq!(rx1.recv(), 1); + } + } + }) +} diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs new file mode 100644 index 00000000000..6396edbdbd1 --- /dev/null +++ b/src/libstd/comm/shared.rs @@ -0,0 +1,493 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Shared channels +/// +/// This is the flavor of channels which are not necessarily optimized for any +/// particular use case, but are the most general in how they are used. Shared +/// channels are cloneable allowing for multiple senders. +/// +/// High level implementation details can be found in the comment of the parent +/// module. You'll also note that the implementation of the shared and stream +/// channels are quite similar, and this is no coincidence! + +pub use self::Failure::*; + +use core::prelude::*; + +use alloc::boxed::Box; +use core::cmp; +use core::int; +use rustrt::local::Local; +use rustrt::mutex::NativeMutex; +use rustrt::task::{Task, BlockedTask}; +use rustrt::thread::Thread; + +use sync::atomic; +use sync::mpsc_queue as mpsc; + +const DISCONNECTED: int = int::MIN; +const FUDGE: int = 1024; +#[cfg(test)] +const MAX_STEALS: int = 5; +#[cfg(not(test))] +const MAX_STEALS: int = 1 << 20; + +pub struct Packet<T> { + queue: mpsc::Queue<T>, + cnt: atomic::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomic::AtomicUint, // Task to wake up + + // The number of channels which are currently using this packet. + channels: atomic::AtomicInt, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: atomic::AtomicBool, + sender_drain: atomic::AtomicInt, + + // this lock protects various portions of this implementation during + // select() + select_lock: NativeMutex, +} + +pub enum Failure { + Empty, + Disconnected, +} + +impl<T: Send> Packet<T> { + // Creation of a packet *must* be followed by a call to postinit_lock + // and later by inherit_blocker + pub fn new() -> Packet<T> { + let p = Packet { + queue: mpsc::Queue::new(), + cnt: atomic::AtomicInt::new(0), + steals: 0, + to_wake: atomic::AtomicUint::new(0), + channels: atomic::AtomicInt::new(2), + port_dropped: atomic::AtomicBool::new(false), + sender_drain: atomic::AtomicInt::new(0), + select_lock: unsafe { NativeMutex::new() }, + }; + return p; + } + + // This function should be used after newly created Packet + // was wrapped with an Arc + // In other case mutex data will be duplicated while cloning + // and that could cause problems on platforms where it is + // represented by opaque data structure + pub fn postinit_lock(&mut self) { + unsafe { self.select_lock.lock_noguard() } + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked task. This is done to prevent spurious wakeups of + // tasks in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) { + match task { + Some(task) => { + assert_eq!(self.cnt.load(atomic::SeqCst), 0); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + self.to_wake.store(unsafe { task.cast_to_uint() }, + atomic::SeqCst); + self.cnt.store(-1, atomic::SeqCst); + + // This store is a little sketchy. What's happening here is + // that we're transferring a blocker from a oneshot or stream + // channel to this shared channel. In doing so, we never + // spuriously wake them up and rather only wake them up at the + // appropriate time. This implementation of shared channels + // assumes that any blocking recv() will undo the increment of + // steals performed in try_recv() once the recv is complete. + // This thread that we're inheriting, however, is not in the + // middle of recv. Hence, the first time we wake them up, + // they're going to wake up from their old port, move on to the + // upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data + // immediately available, counting it as a steal. This in fact + // wasn't a steal because we appropriately blocked them waiting + // for data. + // + // To offset this bad increment, we initially set the steal + // count to -1. You'll find some special code in + // abort_selection() as well to ensure that this -1 steal count + // doesn't escape too far. + self.steals = -1; + } + None => {} + } + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + unsafe { self.select_lock.unlock_noguard() } + } + + pub fn send(&mut self, t: T) -> Result<(), T> { + // See Port::drop for what's going on + if self.port_dropped.load(atomic::SeqCst) { return Err(t) } + + // Note that the multiple sender case is a little trickier + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE { + return Err(t) + } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomic::SeqCst) { + -1 => { + self.take_to_wake().wake().map(|t| t.reawaken()); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, atomic::SeqCst); + + if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + Ok(()) + } + + pub fn recv(&mut self) -> Result<T, Failure> { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let task: Box<Task> = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + data @ Ok(..) => { self.steals -= 1; data } + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomic::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomic::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn try_recv(&mut self) -> Result<T, Failure> { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is + // reported as having data available, but our pop() has + // failed due to the queue being in an inconsistent state. + // This means that there is some pusher somewhere which has + // yet to complete, but we are guaranteed that a pop will + // eventually succeed. In this case, we spin in a yield loop + // because the remote sender should finish their enqueue + // operation "very quickly". + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M + // pushes have succeeded, at least M pops will succeed. The + // current queues guarantee that if there are N active + // pushes, you can pop N times once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + Thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { data = t; break } + mpsc::Empty => panic!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we + // might decrement steals. + Some(data) => { + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + } + n => { + let m = cmp::min(n, self.steals); + self.steals -= m; + self.bump(n - m); + } + } + assert!(self.steals >= 0); + } + self.steals += 1; + Ok(data) + } + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(atomic::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&mut self) { + self.channels.fetch_add(1, atomic::SeqCst); + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&mut self) { + match self.channels.fetch_sub(1, atomic::SeqCst) { + 1 => {} + n if n > 1 => return, + n => panic!("bad number of channels left {}", n), + } + + match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&mut self) { + self.port_dropped.store(true, atomic::SeqCst); + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomic::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomic::SeqCst); + self.to_wake.store(0, atomic::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Helper function for select, tests whether this port can receive without + // blocking (obviously not an atomic decision). + // + // This is different than the stream version because there's no need to peek + // at the queue, we can just look at the local count. + pub fn can_recv(&mut self) -> bool { + let cnt = self.cnt.load(atomic::SeqCst); + cnt == DISCONNECTED || cnt - self.steals > 0 + } + + // increment the count on the channel (used for selection) + fn bump(&mut self, amt: int) -> int { + match self.cnt.fetch_add(amt, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + DISCONNECTED + } + n => n + } + } + + // Inserts the blocked task for selection on this port, returning it back if + // the port already has data on it. + // + // The code here is the same as in stream.rs, except that it doesn't need to + // peek at the channel to see if an upgrade is pending. + pub fn start_selection(&mut self, + task: BlockedTask) -> Result<(), BlockedTask> { + match self.decrement(task) { + Ok(()) => Ok(()), + Err(task) => { + let prev = self.bump(1); + assert!(prev == DISCONNECTED || prev >= 0); + return Err(task); + } + } + } + + // Cancels a previous task waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + unsafe { + let _guard = self.select_lock.lock(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(atomic::SeqCst); + if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + }; + let prev = self.bump(steals + 1); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomic::SeqCst) != 0 { + Thread::yield_now(); + } + } + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + assert!(self.steals == 0 || self.steals == -1); + self.steals = steals; + prev >= 0 + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Packet<T> { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + assert_eq!(self.channels.load(atomic::SeqCst), 0); + } +} diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs new file mode 100644 index 00000000000..23d042960b1 --- /dev/null +++ b/src/libstd/comm/stream.rs @@ -0,0 +1,486 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Stream channels +/// +/// This is the flavor of channels which are optimized for one sender and one +/// receiver. The sender will be upgraded to a shared channel if the channel is +/// cloned. +/// +/// High level implementation details can be found in the comment of the parent +/// module. + +pub use self::Failure::*; +pub use self::UpgradeResult::*; +pub use self::SelectionResult::*; +use self::Message::*; + +use core::prelude::*; + +use alloc::boxed::Box; +use core::cmp; +use core::int; +use rustrt::local::Local; +use rustrt::task::{Task, BlockedTask}; +use rustrt::thread::Thread; + +use sync::atomic; +use sync::spsc_queue as spsc; +use comm::Receiver; + +const DISCONNECTED: int = int::MIN; +#[cfg(test)] +const MAX_STEALS: int = 5; +#[cfg(not(test))] +const MAX_STEALS: int = 1 << 20; + +pub struct Packet<T> { + queue: spsc::Queue<Message<T>>, // internal queue for all message + + cnt: atomic::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomic::AtomicUint, // Task to wake up + + port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed. +} + +pub enum Failure<T> { + Empty, + Disconnected, + Upgraded(Receiver<T>), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult<T> { + SelSuccess, + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Receiver<T>), +} + +// Any message could contain an "upgrade request" to a new shared port, so the +// internal queue it's a queue of T, but rather Message<T> +enum Message<T> { + Data(T), + GoUp(Receiver<T>), +} + +impl<T: Send> Packet<T> { + pub fn new() -> Packet<T> { + Packet { + queue: unsafe { spsc::Queue::new(128) }, + + cnt: atomic::AtomicInt::new(0), + steals: 0, + to_wake: atomic::AtomicUint::new(0), + + port_dropped: atomic::AtomicBool::new(false), + } + } + + + pub fn send(&mut self, t: T) -> Result<(), T> { + // If the other port has deterministically gone away, then definitely + // must return the data back up the stack. Otherwise, the data is + // considered as being sent. + if self.port_dropped.load(atomic::SeqCst) { return Err(t) } + + match self.do_send(Data(t)) { + UpSuccess | UpDisconnected => {}, + UpWoke(task) => { task.wake().map(|t| t.reawaken()); } + } + Ok(()) + } + pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { + // If the port has gone away, then there's no need to proceed any + // further. + if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected } + + self.do_send(GoUp(up)) + } + + fn do_send(&mut self, t: Message<T>) -> UpgradeResult { + self.queue.push(t); + match self.cnt.fetch_add(1, atomic::SeqCst) { + // As described in the mod's doc comment, -1 == wakeup + -1 => UpWoke(self.take_to_wake()), + // As as described before, SPSC queues must be >= -2 + -2 => UpSuccess, + + // Be sure to preserve the disconnected state, and the return value + // in this case is going to be whether our data was received or not. + // This manifests itself on whether we have an empty queue or not. + // + // Primarily, are required to drain the queue here because the port + // will never remove this data. We can only have at most one item to + // drain (the port drains the rest). + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + let first = self.queue.pop(); + let second = self.queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => UpSuccess, // we failed to send the data + None => UpDisconnected, // we successfully sent data + } + } + + // Otherwise we just sent some data on a non-waiting queue, so just + // make sure the world is sane and carry on! + n => { assert!(n >= 0); UpSuccess } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomic::SeqCst); + self.to_wake.store(0, atomic::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + // Decrements the count on the channel for a sleeper, returning the sleeper + // back if it shouldn't sleep. Note that this is the location where we take + // steals into account. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomic::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomic::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn recv(&mut self) -> Result<T, Failure<T>> { + // Optimistic preflight check (scheduling is expensive). + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + // Welp, our channel has no data. Deschedule the current task and + // initiate the blocking protocol. + let task: Box<Task> = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + // Messages which actually popped from the queue shouldn't count as + // a steal, so offset the decrement here (we already have our + // "steal" factored into the channel count above). + data @ Ok(..) | + data @ Err(Upgraded(..)) => { + self.steals -= 1; + data + } + + data => data, + } + } + + pub fn try_recv(&mut self) -> Result<T, Failure<T>> { + match self.queue.pop() { + // If we stole some data, record to that effect (this will be + // factored into cnt later on). + // + // Note that we don't allow steals to grow without bound in order to + // prevent eventual overflow of either steals or cnt as an overflow + // would have catastrophic results. Sometimes, steals > cnt, but + // other times cnt > steals, so we don't know the relation between + // steals and cnt. This code path is executed only rarely, so we do + // a pretty slow operation, of swapping 0 into cnt, taking steals + // down as much as possible (without going negative), and then + // adding back in whatever we couldn't factor into steals. + Some(data) => { + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + } + n => { + let m = cmp::min(n, self.steals); + self.steals -= m; + self.bump(n - m); + } + } + assert!(self.steals >= 0); + } + self.steals += 1; + match data { + Data(t) => Ok(t), + GoUp(up) => Err(Upgraded(up)), + } + } + + None => { + match self.cnt.load(atomic::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + + // This is a little bit of a tricky case. We failed to pop + // data above, and then we have viewed that the channel is + // disconnected. In this window more data could have been + // sent on the channel. It doesn't really make sense to + // return that the channel is disconnected when there's + // actually data on it, so be extra sure there's no data by + // popping one more time. + // + // We can ignore steals because the other end is + // disconnected and we'll never need to really factor in our + // steals again. + _ => { + match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + } + } + } + } + } + } + + pub fn drop_chan(&mut self) { + // Dropping a channel is pretty simple, we just flag it as disconnected + // and then wakeup a blocker if there is one. + match self.cnt.swap(DISCONNECTED, atomic::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + pub fn drop_port(&mut self) { + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another chan is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.port_dropped.store(true, atomic::SeqCst); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomic::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + loop { + match self.queue.pop() { + Some(..) => { steals += 1; } + None => break + } + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Tests to see whether this port can receive without blocking. If Ok is + // returned, then that's the answer. If Err is returned, then the returned + // port needs to be queried instead (an upgrade happened) + pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { + // We peek at the queue to see if there's anything on it, and we use + // this return value to determine if we should pop from the queue and + // upgrade this channel immediately. If it looks like we've got an + // upgrade pending, then go through the whole recv rigamarole to update + // the internal state. + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.recv() { + Err(Upgraded(port)) => Err(port), + _ => unreachable!(), + } + } + Some(..) => Ok(true), + None => Ok(false) + } + } + + // increment the count on the channel (used for selection) + fn bump(&mut self, amt: int) -> int { + match self.cnt.fetch_add(amt, atomic::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomic::SeqCst); + DISCONNECTED + } + n => n + } + } + + // Attempts to start selecting on this port. Like a oneshot, this can fail + // immediately because of an upgrade. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> { + match self.decrement(task) { + Ok(()) => SelSuccess, + Err(task) => { + let ret = match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => SelUpgraded(task, port), + _ => unreachable!(), + } + } + Some(..) => SelCanceled(task), + None => SelCanceled(task), + }; + // Undo our decrement above, and we should be guaranteed that the + // previous value is positive because we're not going to sleep + let prev = self.bump(1); + assert!(prev == DISCONNECTED || prev >= 0); + return ret; + } + } + } + + // Removes a previous task from being blocked in this port + pub fn abort_selection(&mut self, + was_upgrade: bool) -> Result<bool, Receiver<T>> { + // If we're aborting selection after upgrading from a oneshot, then + // we're guarantee that no one is waiting. The only way that we could + // have seen the upgrade is if data was actually sent on the channel + // half again. For us, this means that there is guaranteed to be data on + // this channel. Furthermore, we're guaranteed that there was no + // start_selection previously, so there's no need to modify `self.cnt` + // at all. + // + // Hence, because of these invariants, we immediately return `Ok(true)`. + // Note that the data may not actually be sent on the channel just yet. + // The other end could have flagged the upgrade but not sent data to + // this end. This is fine because we know it's a small bounded windows + // of time until the data is actually sent. + if was_upgrade { + assert_eq!(self.steals, 0); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + return Ok(true) + } + + // We want to make sure that the count on the channel goes non-negative, + // and in the stream case we can have at most one steal, so just assume + // that we had one steal. + let steals = 1; + let prev = self.bump(steals + 1); + + // If we were previously disconnected, then we know for sure that there + // is no task in to_wake, so just keep going + let has_data = if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + // + // If the previous count was positive then we're in a tougher + // situation. A possible race is that a sender just incremented + // through -1 (meaning it's going to try to wake a task up), but it + // hasn't yet read the to_wake. In order to prevent a future recv() + // from waking up too early (this sender picking up the plastered + // over to_wake), we spin loop here waiting for to_wake to be 0. + // Note that this entire select() implementation needs an overhaul, + // and this is *not* the worst part of it, so this is not done as a + // final solution but rather out of necessity for now to get + // something working. + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomic::SeqCst) != 0 { + Thread::yield_now(); + } + } + assert_eq!(self.steals, 0); + self.steals = steals; + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + }; + + // Now that we've determined that this queue "has data", we peek at the + // queue to see if the data is an upgrade or not. If it's an upgrade, + // then we need to destroy this port and abort selection on the + // upgraded port. + if has_data { + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + } + } + _ => Ok(true), + } + } else { + Ok(false) + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Packet<T> { + fn drop(&mut self) { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomic::SeqCst), 0); + } +} diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs new file mode 100644 index 00000000000..a2e839e134c --- /dev/null +++ b/src/libstd/comm/sync.rs @@ -0,0 +1,490 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Synchronous channels/ports +/// +/// This channel implementation differs significantly from the asynchronous +/// implementations found next to it (oneshot/stream/share). This is an +/// implementation of a synchronous, bounded buffer channel. +/// +/// Each channel is created with some amount of backing buffer, and sends will +/// *block* until buffer space becomes available. A buffer size of 0 is valid, +/// which means that every successful send is paired with a successful recv. +/// +/// This flavor of channels defines a new `send_opt` method for channels which +/// is the method by which a message is sent but the task does not panic if it +/// cannot be delivered. +/// +/// Another major difference is that send() will *always* return back the data +/// if it couldn't be sent. This is because it is deterministically known when +/// the data is received and when it is not received. +/// +/// Implementation-wise, it can all be summed up with "use a mutex plus some +/// logic". The mutex used here is an OS native mutex, meaning that no user code +/// is run inside of the mutex (to prevent context switching). This +/// implementation shares almost all code for the buffered and unbuffered cases +/// of a synchronous channel. There are a few branches for the unbuffered case, +/// but they're mostly just relevant to blocking senders. + +use core::prelude::*; + +pub use self::Failure::*; +use self::Blocker::*; + +use alloc::boxed::Box; +use vec::Vec; +use core::mem; +use core::cell::UnsafeCell; +use rustrt::local::Local; +use rustrt::mutex::{NativeMutex, LockGuard}; +use rustrt::task::{Task, BlockedTask}; + +use sync::atomic; + +pub struct Packet<T> { + /// Only field outside of the mutex. Just done for kicks, but mainly because + /// the other shared channel already had the code implemented + channels: atomic::AtomicUint, + + /// The state field is protected by this mutex + lock: NativeMutex, + state: UnsafeCell<State<T>>, +} + +struct State<T> { + disconnected: bool, // Is the channel disconnected yet? + queue: Queue, // queue of senders waiting to send data + blocker: Blocker, // currently blocked task on this channel + buf: Buffer<T>, // storage for buffered messages + cap: uint, // capacity of this channel + + /// A curious flag used to indicate whether a sender failed or succeeded in + /// blocking. This is used to transmit information back to the task that it + /// must dequeue its message from the buffer because it was not received. + /// This is only relevant in the 0-buffer case. This obviously cannot be + /// safely constructed, but it's guaranteed to always have a valid pointer + /// value. + canceled: Option<&'static mut bool>, +} + +/// Possible flavors of tasks who can be blocked on this channel. +enum Blocker { + BlockedSender(BlockedTask), + BlockedReceiver(BlockedTask), + NoneBlocked +} + +/// Simple queue for threading tasks together. Nodes are stack-allocated, so +/// this structure is not safe at all +struct Queue { + head: *mut Node, + tail: *mut Node, +} + +struct Node { + task: Option<BlockedTask>, + next: *mut Node, +} + +/// A simple ring-buffer +struct Buffer<T> { + buf: Vec<Option<T>>, + start: uint, + size: uint, +} + +#[deriving(Show)] +pub enum Failure { + Empty, + Disconnected, +} + +/// Atomically blocks the current task, placing it into `slot`, unlocking `lock` +/// in the meantime. This re-locks the mutex upon returning. +fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker, + lock: &NativeMutex) { + let me: Box<Task> = Local::take(); + me.deschedule(1, |task| { + match mem::replace(slot, f(task)) { + NoneBlocked => {} + _ => unreachable!(), + } + unsafe { lock.unlock_noguard(); } + Ok(()) + }); + unsafe { lock.lock_noguard(); } +} + +/// Wakes up a task, dropping the lock at the correct time +fn wakeup(task: BlockedTask, guard: LockGuard) { + // We need to be careful to wake up the waiting task *outside* of the mutex + // in case it incurs a context switch. + mem::drop(guard); + task.wake().map(|t| t.reawaken()); +} + +impl<T: Send> Packet<T> { + pub fn new(cap: uint) -> Packet<T> { + Packet { + channels: atomic::AtomicUint::new(1), + lock: unsafe { NativeMutex::new() }, + state: UnsafeCell::new(State { + disconnected: false, + blocker: NoneBlocked, + cap: cap, + canceled: None, + queue: Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }, + buf: Buffer { + buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None), + start: 0, + size: 0, + }, + }), + } + } + + // Locks this channel, returning a guard for the state and the mutable state + // itself. Care should be taken to ensure that the state does not escape the + // guard! + // + // Note that we're ok promoting an & reference to an &mut reference because + // the lock ensures that we're the only ones in the world with a pointer to + // the state. + fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) { + unsafe { + let guard = self.lock.lock(); + (guard, &mut *self.state.get()) + } + } + + pub fn send(&self, t: T) -> Result<(), T> { + let (guard, state) = self.lock(); + + // wait for a slot to become available, and enqueue the data + while !state.disconnected && state.buf.size() == state.buf.cap() { + state.queue.enqueue(&self.lock); + } + if state.disconnected { return Err(t) } + state.buf.enqueue(t); + + match mem::replace(&mut state.blocker, NoneBlocked) { + // if our capacity is 0, then we need to wait for a receiver to be + // available to take our data. After waiting, we check again to make + // sure the port didn't go away in the meantime. If it did, we need + // to hand back our data. + NoneBlocked if state.cap == 0 => { + let mut canceled = false; + assert!(state.canceled.is_none()); + state.canceled = Some(unsafe { mem::transmute(&mut canceled) }); + wait(&mut state.blocker, BlockedSender, &self.lock); + if canceled {Err(state.buf.dequeue())} else {Ok(())} + } + + // success, we buffered some data + NoneBlocked => Ok(()), + + // success, someone's about to receive our buffered data. + BlockedReceiver(task) => { wakeup(task, guard); Ok(()) } + + BlockedSender(..) => panic!("lolwut"), + } + } + + pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { + let (guard, state) = self.lock(); + if state.disconnected { + Err(super::RecvDisconnected(t)) + } else if state.buf.size() == state.buf.cap() { + Err(super::Full(t)) + } else if state.cap == 0 { + // With capacity 0, even though we have buffer space we can't + // transfer the data unless there's a receiver waiting. + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => Err(super::Full(t)), + BlockedSender(..) => unreachable!(), + BlockedReceiver(task) => { + state.buf.enqueue(t); + wakeup(task, guard); + Ok(()) + } + } + } else { + // If the buffer has some space and the capacity isn't 0, then we + // just enqueue the data for later retrieval, ensuring to wake up + // any blocked receiver if there is one. + assert!(state.buf.size() < state.buf.cap()); + state.buf.enqueue(t); + match mem::replace(&mut state.blocker, NoneBlocked) { + BlockedReceiver(task) => wakeup(task, guard), + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + } + Ok(()) + } + } + + // Receives a message from this channel + // + // When reading this, remember that there can only ever be one receiver at + // time. + pub fn recv(&self) -> Result<T, ()> { + let (guard, state) = self.lock(); + + // Wait for the buffer to have something in it. No need for a while loop + // because we're the only receiver. + let mut waited = false; + if !state.disconnected && state.buf.size() == 0 { + wait(&mut state.blocker, BlockedReceiver, &self.lock); + waited = true; + } + if state.disconnected && state.buf.size() == 0 { return Err(()) } + + // Pick up the data, wake up our neighbors, and carry on + assert!(state.buf.size() > 0); + let ret = state.buf.dequeue(); + self.wakeup_senders(waited, guard, state); + return Ok(ret); + } + + pub fn try_recv(&self) -> Result<T, Failure> { + let (guard, state) = self.lock(); + + // Easy cases first + if state.disconnected { return Err(Disconnected) } + if state.buf.size() == 0 { return Err(Empty) } + + // Be sure to wake up neighbors + let ret = Ok(state.buf.dequeue()); + self.wakeup_senders(false, guard, state); + + return ret; + } + + // Wake up pending senders after some data has been received + // + // * `waited` - flag if the receiver blocked to receive some data, or if it + // just picked up some data on the way out + // * `guard` - the lock guard that is held over this channel's lock + fn wakeup_senders(&self, waited: bool, + guard: LockGuard, + state: &mut State<T>) { + let pending_sender1: Option<BlockedTask> = state.queue.dequeue(); + + // If this is a no-buffer channel (cap == 0), then if we didn't wait we + // need to ACK the sender. If we waited, then the sender waking us up + // was already the ACK. + let pending_sender2 = if state.cap == 0 && !waited { + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedReceiver(..) => unreachable!(), + BlockedSender(task) => { + state.canceled.take(); + Some(task) + } + } + } else { + None + }; + mem::drop((state, guard)); + + // only outside of the lock do we wake up the pending tasks + pending_sender1.map(|t| t.wake().map(|t| t.reawaken())); + pending_sender2.map(|t| t.wake().map(|t| t.reawaken())); + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&self) { + self.channels.fetch_add(1, atomic::SeqCst); + } + + pub fn drop_chan(&self) { + // Only flag the channel as disconnected if we're the last channel + match self.channels.fetch_sub(1, atomic::SeqCst) { + 1 => {} + _ => return + } + + // Not much to do other than wake up a receiver if one's there + let (guard, state) = self.lock(); + if state.disconnected { return } + state.disconnected = true; + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(task) => wakeup(task, guard), + } + } + + pub fn drop_port(&self) { + let (guard, state) = self.lock(); + + if state.disconnected { return } + state.disconnected = true; + + // If the capacity is 0, then the sender may want its data back after + // we're disconnected. Otherwise it's now our responsibility to destroy + // the buffered data. As with many other portions of this code, this + // needs to be careful to destroy the data *outside* of the lock to + // prevent deadlock. + let _data = if state.cap != 0 { + mem::replace(&mut state.buf.buf, Vec::new()) + } else { + Vec::new() + }; + let mut queue = mem::replace(&mut state.queue, Queue { + head: 0 as *mut Node, + tail: 0 as *mut Node, + }); + + let waiter = match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => None, + BlockedSender(task) => { + *state.canceled.take().unwrap() = true; + Some(task) + } + BlockedReceiver(..) => unreachable!(), + }; + mem::drop((state, guard)); + + loop { + match queue.dequeue() { + Some(task) => { task.wake().map(|t| t.reawaken()); } + None => break, + } + } + waiter.map(|t| t.wake().map(|t| t.reawaken())); + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&self) -> bool { + let (_g, state) = self.lock(); + state.disconnected || state.buf.size() > 0 + } + + // Attempts to start selection on this port. This can either succeed or fail + // because there is data waiting. + pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{ + let (_g, state) = self.lock(); + if state.disconnected || state.buf.size() > 0 { + Err(task) + } else { + match mem::replace(&mut state.blocker, BlockedReceiver(task)) { + NoneBlocked => {} + BlockedSender(..) => unreachable!(), + BlockedReceiver(..) => unreachable!(), + } + Ok(()) + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&self) -> bool { + let (_g, state) = self.lock(); + match mem::replace(&mut state.blocker, NoneBlocked) { + NoneBlocked => true, + BlockedSender(task) => { + state.blocker = BlockedSender(task); + true + } + BlockedReceiver(task) => { task.trash(); false } + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Packet<T> { + fn drop(&mut self) { + assert_eq!(self.channels.load(atomic::SeqCst), 0); + let (_g, state) = self.lock(); + assert!(state.queue.dequeue().is_none()); + assert!(state.canceled.is_none()); + } +} + + +//////////////////////////////////////////////////////////////////////////////// +// Buffer, a simple ring buffer backed by Vec<T> +//////////////////////////////////////////////////////////////////////////////// + +impl<T> Buffer<T> { + fn enqueue(&mut self, t: T) { + let pos = (self.start + self.size) % self.buf.len(); + self.size += 1; + let prev = mem::replace(&mut self.buf[pos], Some(t)); + assert!(prev.is_none()); + } + + fn dequeue(&mut self) -> T { + let start = self.start; + self.size -= 1; + self.start = (self.start + 1) % self.buf.len(); + self.buf[start].take().unwrap() + } + + fn size(&self) -> uint { self.size } + fn cap(&self) -> uint { self.buf.len() } +} + +//////////////////////////////////////////////////////////////////////////////// +// Queue, a simple queue to enqueue tasks with (stack-allocated nodes) +//////////////////////////////////////////////////////////////////////////////// + +impl Queue { + fn enqueue(&mut self, lock: &NativeMutex) { + let task: Box<Task> = Local::take(); + let mut node = Node { + task: None, + next: 0 as *mut Node, + }; + task.deschedule(1, |task| { + node.task = Some(task); + if self.tail.is_null() { + self.head = &mut node as *mut Node; + self.tail = &mut node as *mut Node; + } else { + unsafe { + (*self.tail).next = &mut node as *mut Node; + self.tail = &mut node as *mut Node; + } + } + unsafe { lock.unlock_noguard(); } + Ok(()) + }); + unsafe { lock.lock_noguard(); } + assert!(node.next.is_null()); + } + + fn dequeue(&mut self) -> Option<BlockedTask> { + if self.head.is_null() { + return None + } + let node = self.head; + self.head = unsafe { (*node).next }; + if self.head.is_null() { + self.tail = 0 as *mut Node; + } + unsafe { + (*node).next = 0 as *mut Node; + Some((*node).task.take().unwrap()) + } + } +} |
