diff options
| author | bors <bors@rust-lang.org> | 2014-06-11 11:47:04 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-06-11 11:47:04 -0700 |
| commit | f9260d41d6e37653bf71b08a041be0310098716a (patch) | |
| tree | 4f94a53405884d3c29224585d2eac1d5242eb7d1 /src/libstd | |
| parent | f0f9095f1daa3814c9589f38b574e51d394d1bbc (diff) | |
| parent | b1c9ce9c6f0eb7d4a7df1aad6b6799f4b548181c (diff) | |
| download | rust-f9260d41d6e37653bf71b08a041be0310098716a.tar.gz rust-f9260d41d6e37653bf71b08a041be0310098716a.zip | |
auto merge of #14746 : alexcrichton/rust/libsync, r=brson
This commit is the final step in the libstd facade, #13851. The purpose of this commit is to move libsync underneath the standard library, behind the facade. This will allow core primitives like channels, queues, and atomics to all live in the same location. There were a few notable changes and a few breaking changes as part of this movement: * The `Vec` and `String` types are reexported at the top level of libcollections * The `unreachable!()` macro was copied to libcore * The `std::rt::thread` module was moved to librustrt, but it is still reexported at the same location. * The `std::comm` module was moved to libsync * The `sync::comm` module was moved under `sync::comm`, and renamed to `duplex`. It is now a private module with types/functions being reexported under `sync::comm`. This is a breaking change for any existing users of duplex streams. * All concurrent queues/deques were moved directly under libsync. They are also all marked with #![experimental] for now if they are public. * The `task_pool` and `future` modules no longer live in libsync, but rather live under `std::sync`. They will forever live at this location, but they may move to libsync if the `std::task` module moves as well. [breaking-change]
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm/mod.rs | 2033 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 373 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 688 | ||||
| -rw-r--r-- | src/libstd/comm/shared.rs | 505 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 483 | ||||
| -rw-r--r-- | src/libstd/comm/sync.rs | 485 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 5 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 5 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 348 | ||||
| -rw-r--r-- | src/libstd/sync/atomics.rs | 234 | ||||
| -rw-r--r-- | src/libstd/sync/deque.rs | 666 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 209 | ||||
| -rw-r--r-- | src/libstd/sync/mod.rs | 16 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 220 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 208 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 300 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 98 |
17 files changed, 322 insertions, 6554 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs deleted file mode 100644 index ffb2a0dbed8..00000000000 --- a/src/libstd/comm/mod.rs +++ /dev/null @@ -1,2033 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Communication primitives for concurrent tasks -//! -//! Rust makes it very difficult to share data among tasks to prevent race -//! conditions and to improve parallelism, but there is often a need for -//! communication between concurrent tasks. The primitives defined in this -//! module are the building blocks for synchronization in rust. -//! -//! This module provides message-based communication over channels, concretely -//! defined among three types: -//! -//! * `Sender` -//! * `SyncSender` -//! * `Receiver` -//! -//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both -//! senders are clone-able such that many tasks can send simultaneously to one -//! receiver. These channels are *task blocking*, not *thread blocking*. This -//! means that if one task is blocked on a channel, other tasks can continue to -//! make progress. -//! -//! Rust channels come in one of two flavors: -//! -//! 1. An asynchronous, infinitely buffered channel. The `channel()` function -//! will return a `(Sender, Receiver)` tuple where all sends will be -//! **asynchronous** (they never block). The channel conceptually has an -//! infinite buffer. -//! -//! 2. A synchronous, bounded channel. The `sync_channel()` function will return -//! a `(SyncSender, Receiver)` tuple where the storage for pending messages -//! is a pre-allocated buffer of a fixed size. All sends will be -//! **synchronous** by blocking until there is buffer space available. Note -//! that a bound of 0 is allowed, causing the channel to become a -//! "rendezvous" channel where each sender atomically hands off a message to -//! a receiver. -//! -//! ## Failure Propagation -//! -//! In addition to being a core primitive for communicating in rust, channels -//! are the points at which failure is propagated among tasks. Whenever the one -//! half of channel is closed, the other half will have its next operation -//! `fail!`. The purpose of this is to allow propagation of failure among tasks -//! that are linked to one another via channels. -//! -//! There are methods on both of senders and receivers to perform their -//! respective operations without failing, however. -//! -//! ## Runtime Requirements -//! -//! The channel types defined in this module generally have very few runtime -//! requirements in order to operate. The major requirement they have is for a -//! local rust `Task` to be available if any *blocking* operation is performed. -//! -//! If a local `Task` is not available (for example an FFI callback), then the -//! `send` operation is safe on a `Sender` (as well as a `send_opt`) as well as -//! the `try_send` method on a `SyncSender`, but no other operations are -//! guaranteed to be safe. -//! -//! Additionally, channels can interoperate between runtimes. If one task in a -//! program is running on libnative and another is running on libgreen, they can -//! still communicate with one another using channels. -//! -//! # Example -//! -//! Simple usage: -//! -//! ``` -//! // Create a simple streaming channel -//! let (tx, rx) = channel(); -//! spawn(proc() { -//! tx.send(10); -//! }); -//! assert_eq!(rx.recv(), 10); -//! ``` -//! -//! Shared usage: -//! -//! ``` -//! // Create a shared channel which can be sent along from many tasks -//! let (tx, rx) = channel(); -//! for i in range(0, 10) { -//! let tx = tx.clone(); -//! spawn(proc() { -//! tx.send(i); -//! }) -//! } -//! -//! for _ in range(0, 10) { -//! let j = rx.recv(); -//! assert!(0 <= j && j < 10); -//! } -//! ``` -//! -//! Propagating failure: -//! -//! ```should_fail -//! // The call to recv() will fail!() because the channel has already hung -//! // up (or been deallocated) -//! let (tx, rx) = channel::<int>(); -//! drop(tx); -//! rx.recv(); -//! ``` -//! -//! Synchronous channels: -//! -//! ``` -//! let (tx, rx) = sync_channel(0); -//! spawn(proc() { -//! // This will wait for the parent task to start receiving -//! tx.send(53); -//! }); -//! rx.recv(); -//! ``` - -// A description of how Rust's channel implementation works -// -// Channels are supposed to be the basic building block for all other -// concurrent primitives that are used in Rust. As a result, the channel type -// needs to be highly optimized, flexible, and broad enough for use everywhere. -// -// The choice of implementation of all channels is to be built on lock-free data -// structures. The channels themselves are then consequently also lock-free data -// structures. As always with lock-free code, this is a very "here be dragons" -// territory, especially because I'm unaware of any academic papers which have -// gone into great length about channels of these flavors. -// -// ## Flavors of channels -// -// From the perspective of a consumer of this library, there is only one flavor -// of channel. This channel can be used as a stream and cloned to allow multiple -// senders. Under the hood, however, there are actually three flavors of -// channels in play. -// -// * Oneshots - these channels are highly optimized for the one-send use case. -// They contain as few atomics as possible and involve one and -// exactly one allocation. -// * Streams - these channels are optimized for the non-shared use case. They -// use a different concurrent queue which is more tailored for this -// use case. The initial allocation of this flavor of channel is not -// optimized. -// * Shared - this is the most general form of channel that this module offers, -// a channel with multiple senders. This type is as optimized as it -// can be, but the previous two types mentioned are much faster for -// their use-cases. -// -// ## Concurrent queues -// -// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but -// recv() obviously blocks. This means that under the hood there must be some -// shared and concurrent queue holding all of the actual data. -// -// With two flavors of channels, two flavors of queues are also used. We have -// chosen to use queues from a well-known author which are abbreviated as SPSC -// and MPSC (single producer, single consumer and multiple producer, single -// consumer). SPSC queues are used for streams while MPSC queues are used for -// shared channels. -// -// ### SPSC optimizations -// -// The SPSC queue found online is essentially a linked list of nodes where one -// half of the nodes are the "queue of data" and the other half of nodes are a -// cache of unused nodes. The unused nodes are used such that an allocation is -// not required on every push() and a free doesn't need to happen on every -// pop(). -// -// As found online, however, the cache of nodes is of an infinite size. This -// means that if a channel at one point in its life had 50k items in the queue, -// then the queue will always have the capacity for 50k items. I believed that -// this was an unnecessary limitation of the implementation, so I have altered -// the queue to optionally have a bound on the cache size. -// -// By default, streams will have an unbounded SPSC queue with a small-ish cache -// size. The hope is that the cache is still large enough to have very fast -// send() operations while not too large such that millions of channels can -// coexist at once. -// -// ### MPSC optimizations -// -// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses -// a linked list under the hood to earn its unboundedness, but I have not put -// forth much effort into having a cache of nodes similar to the SPSC queue. -// -// For now, I believe that this is "ok" because shared channels are not the most -// common type, but soon we may wish to revisit this queue choice and determine -// another candidate for backend storage of shared channels. -// -// ## Overview of the Implementation -// -// Now that there's a little background on the concurrent queues used, it's -// worth going into much more detail about the channels themselves. The basic -// pseudocode for a send/recv are: -// -// -// send(t) recv() -// queue.push(t) return if queue.pop() -// if increment() == -1 deschedule { -// wakeup() if decrement() > 0 -// cancel_deschedule() -// } -// queue.pop() -// -// As mentioned before, there are no locks in this implementation, only atomic -// instructions are used. -// -// ### The internal atomic counter -// -// Every channel has a shared counter with each half to keep track of the size -// of the queue. This counter is used to abort descheduling by the receiver and -// to know when to wake up on the sending side. -// -// As seen in the pseudocode, senders will increment this count and receivers -// will decrement the count. The theory behind this is that if a sender sees a -// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, -// then it doesn't need to block. -// -// The recv() method has a beginning call to pop(), and if successful, it needs -// to decrement the count. It is a crucial implementation detail that this -// decrement does *not* happen to the shared counter. If this were the case, -// then it would be possible for the counter to be very negative when there were -// no receivers waiting, in which case the senders would have to determine when -// it was actually appropriate to wake up a receiver. -// -// Instead, the "steal count" is kept track of separately (not atomically -// because it's only used by receivers), and then the decrement() call when -// descheduling will lump in all of the recent steals into one large decrement. -// -// The implication of this is that if a sender sees a -1 count, then there's -// guaranteed to be a waiter waiting! -// -// ## Native Implementation -// -// A major goal of these channels is to work seamlessly on and off the runtime. -// All of the previous race conditions have been worded in terms of -// scheduler-isms (which is obviously not available without the runtime). -// -// For now, native usage of channels (off the runtime) will fall back onto -// mutexes/cond vars for descheduling/atomic decisions. The no-contention path -// is still entirely lock-free, the "deschedule" blocks above are surrounded by -// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a -// condition variable. -// -// ## Select -// -// Being able to support selection over channels has greatly influenced this -// design, and not only does selection need to work inside the runtime, but also -// outside the runtime. -// -// The implementation is fairly straightforward. The goal of select() is not to -// return some data, but only to return which channel can receive data without -// blocking. The implementation is essentially the entire blocking procedure -// followed by an increment as soon as its woken up. The cancellation procedure -// involves an increment and swapping out of to_wake to acquire ownership of the -// task to unblock. -// -// Sadly this current implementation requires multiple allocations, so I have -// seen the throughput of select() be much worse than it should be. I do not -// believe that there is anything fundamental which needs to change about these -// channels, however, in order to support a more efficient select(). -// -// # Conclusion -// -// And now that you've seen all the races that I found and attempted to fix, -// here's the code for you to find some more! - -use alloc::arc::Arc; - -use cell::Cell; -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use kinds::marker; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use result::{Ok, Err, Result}; -use rt::local::Local; -use rt::task::{Task, BlockedTask}; -use ty::Unsafe; - -pub use comm::select::{Select, Handle}; - -macro_rules! test ( - { fn $name:ident() $b:block $(#[$a:meta])*} => ( - mod $name { - #![allow(unused_imports)] - - use native; - use comm::*; - use prelude::*; - use super::*; - use super::super::*; - use owned::Box; - use task; - - fn f() $b - - $(#[$a])* #[test] fn uv() { f() } - $(#[$a])* #[test] fn native() { - use native; - let (tx, rx) = channel(); - native::task::spawn(proc() { tx.send(f()) }); - rx.recv(); - } - } - ) -) - -mod select; -mod oneshot; -mod stream; -mod shared; -mod sync; - -// Use a power of 2 to allow LLVM to optimize to something that's not a -// division, this is hit pretty regularly. -static RESCHED_FREQ: int = 256; - -/// The receiving-half of Rust's channel type. This half can only be owned by -/// one task -pub struct Receiver<T> { - inner: Unsafe<Flavor<T>>, - receives: Cell<uint>, - // can't share in an arc - marker: marker::NoShare, -} - -/// An iterator over messages on a receiver, this iterator will block -/// whenever `next` is called, waiting for a new message, and `None` will be -/// returned when the corresponding channel has hung up. -pub struct Messages<'a, T> { - rx: &'a Receiver<T> -} - -/// The sending-half of Rust's asynchronous channel type. This half can only be -/// owned by one task, but it can be cloned to send to other tasks. -pub struct Sender<T> { - inner: Unsafe<Flavor<T>>, - sends: Cell<uint>, - // can't share in an arc - marker: marker::NoShare, -} - -/// The sending-half of Rust's synchronous channel type. This half can only be -/// owned by one task, but it can be cloned to send to other tasks. -pub struct SyncSender<T> { - inner: Arc<Unsafe<sync::Packet<T>>>, - // can't share in an arc - marker: marker::NoShare, -} - -/// This enumeration is the list of the possible reasons that try_recv could not -/// return data when called. -#[deriving(PartialEq, Clone, Show)] -pub enum TryRecvError { - /// This channel is currently empty, but the sender(s) have not yet - /// disconnected, so data may yet become available. - Empty, - /// This channel's sending half has become disconnected, and there will - /// never be any more data received on this channel - Disconnected, -} - -/// This enumeration is the list of the possible error outcomes for the -/// `SyncSender::try_send` method. -#[deriving(PartialEq, Clone, Show)] -pub enum TrySendError<T> { - /// The data could not be sent on the channel because it would require that - /// the callee block to send the data. - /// - /// If this is a buffered channel, then the buffer is full at this time. If - /// this is not a buffered channel, then there is no receiver available to - /// acquire the data. - Full(T), - /// This channel's receiving half has disconnected, so the data could not be - /// sent. The data is returned back to the callee in this case. - RecvDisconnected(T), -} - -enum Flavor<T> { - Oneshot(Arc<Unsafe<oneshot::Packet<T>>>), - Stream(Arc<Unsafe<stream::Packet<T>>>), - Shared(Arc<Unsafe<shared::Packet<T>>>), - Sync(Arc<Unsafe<sync::Packet<T>>>), -} - -#[doc(hidden)] -trait UnsafeFlavor<T> { - fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>>; - unsafe fn mut_inner<'a>(&'a self) -> &'a mut Flavor<T> { - &mut *self.inner_unsafe().get() - } - unsafe fn inner<'a>(&'a self) -> &'a Flavor<T> { - &*self.inner_unsafe().get() - } -} -impl<T> UnsafeFlavor<T> for Sender<T> { - fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> { - &self.inner - } -} -impl<T> UnsafeFlavor<T> for Receiver<T> { - fn inner_unsafe<'a>(&'a self) -> &'a Unsafe<Flavor<T>> { - &self.inner - } -} - -/// Creates a new asynchronous channel, returning the sender/receiver halves. -/// -/// All data sent on the sender will become available on the receiver, and no -/// send will block the calling task (this channel has an "infinite buffer"). -/// -/// # Example -/// -/// ``` -/// let (tx, rx) = channel(); -/// -/// // Spawn off an expensive computation -/// spawn(proc() { -/// # fn expensive_computation() {} -/// tx.send(expensive_computation()); -/// }); -/// -/// // Do some useful work for awhile -/// -/// // Let's see what that answer was -/// println!("{}", rx.recv()); -/// ``` -pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { - let a = Arc::new(Unsafe::new(oneshot::Packet::new())); - (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a))) -} - -/// Creates a new synchronous, bounded channel. -/// -/// Like asynchronous channels, the `Receiver` will block until a message -/// becomes available. These channels differ greatly in the semantics of the -/// sender from asynchronous channels, however. -/// -/// This channel has an internal buffer on which messages will be queued. When -/// the internal buffer becomes full, future sends will *block* waiting for the -/// buffer to open up. Note that a buffer size of 0 is valid, in which case this -/// becomes "rendezvous channel" where each send will not return until a recv -/// is paired with it. -/// -/// As with asynchronous channels, all senders will fail in `send` if the -/// `Receiver` has been destroyed. -/// -/// # Example -/// -/// ``` -/// let (tx, rx) = sync_channel(1); -/// -/// // this returns immediately -/// tx.send(1); -/// -/// spawn(proc() { -/// // this will block until the previous message has been received -/// tx.send(2); -/// }); -/// -/// assert_eq!(rx.recv(), 1); -/// assert_eq!(rx.recv(), 2); -/// ``` -pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { - let a = Arc::new(Unsafe::new(sync::Packet::new(bound))); - (SyncSender::new(a.clone()), Receiver::new(Sync(a))) -} - -//////////////////////////////////////////////////////////////////////////////// -// Sender -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> Sender<T> { - fn new(inner: Flavor<T>) -> Sender<T> { - Sender { inner: Unsafe::new(inner), sends: Cell::new(0), marker: marker::NoShare } - } - - /// Sends a value along this channel to be received by the corresponding - /// receiver. - /// - /// Rust channels are infinitely buffered so this method will never block. - /// - /// # Failure - /// - /// This function will fail if the other end of the channel has hung up. - /// This means that if the corresponding receiver has fallen out of scope, - /// this function will trigger a fail message saying that a message is - /// being sent on a closed channel. - /// - /// Note that if this function does *not* fail, it does not mean that the - /// data will be successfully received. All sends are placed into a queue, - /// so it is possible for a send to succeed (the other end is alive), but - /// then the other end could immediately disconnect. - /// - /// The purpose of this functionality is to propagate failure among tasks. - /// If failure is not desired, then consider using the `send_opt` method - pub fn send(&self, t: T) { - if self.send_opt(t).is_err() { - fail!("sending on a closed channel"); - } - } - - /// Attempts to send a value on this channel, returning it back if it could - /// not be sent. - /// - /// A successful send occurs when it is determined that the other end of - /// the channel has not hung up already. An unsuccessful send would be one - /// where the corresponding receiver has already been deallocated. Note - /// that a return value of `Err` means that the data will never be - /// received, but a return value of `Ok` does *not* mean that the data - /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns `Ok`. - /// - /// Like `send`, this method will never block. - /// - /// # Failure - /// - /// This method will never fail, it will return the message back to the - /// caller if the other end is disconnected - /// - /// # Example - /// - /// ``` - /// let (tx, rx) = channel(); - /// - /// // This send is always successful - /// assert_eq!(tx.send_opt(1), Ok(())); - /// - /// // This send will fail because the receiver is gone - /// drop(rx); - /// assert_eq!(tx.send_opt(1), Err(1)); - /// ``` - pub fn send_opt(&self, t: T) -> Result<(), T> { - // In order to prevent starvation of other tasks in situations where - // a task sends repeatedly without ever receiving, we occasionally - // yield instead of doing a send immediately. - // - // Don't unconditionally attempt to yield because the TLS overhead can - // be a bit much, and also use `try_take` instead of `take` because - // there's no reason that this send shouldn't be usable off the - // runtime. - let cnt = self.sends.get() + 1; - self.sends.set(cnt); - if cnt % (RESCHED_FREQ as uint) == 0 { - let task: Option<Box<Task>> = Local::try_take(); - task.map(|t| t.maybe_yield()); - } - - let (new_inner, ret) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - unsafe { - let p = p.get(); - if !(*p).sent() { - return (*p).send(t); - } else { - let a = Arc::new(Unsafe::new(stream::Packet::new())); - match (*p).upgrade(Receiver::new(Stream(a.clone()))) { - oneshot::UpSuccess => { - let ret = (*a.get()).send(t); - (a, ret) - } - oneshot::UpDisconnected => (a, Err(t)), - oneshot::UpWoke(task) => { - // This send cannot fail because the task is - // asleep (we're looking at it), so the receiver - // can't go away. - (*a.get()).send(t).ok().unwrap(); - task.wake().map(|t| t.reawaken()); - (a, Ok(())) - } - } - } - } - } - Stream(ref p) => return unsafe { (*p.get()).send(t) }, - Shared(ref p) => return unsafe { (*p.get()).send(t) }, - Sync(..) => unreachable!(), - }; - - unsafe { - let tmp = Sender::new(Stream(new_inner)); - mem::swap(self.mut_inner(), tmp.mut_inner()); - } - return ret; - } -} - -impl<T: Send> Clone for Sender<T> { - fn clone(&self) -> Sender<T> { - let (packet, sleeper) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - let a = Arc::new(Unsafe::new(shared::Packet::new())); - unsafe { - (*a.get()).postinit_lock(); - match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - oneshot::UpSuccess | oneshot::UpDisconnected => (a, None), - oneshot::UpWoke(task) => (a, Some(task)) - } - } - } - Stream(ref p) => { - let a = Arc::new(Unsafe::new(shared::Packet::new())); - unsafe { - (*a.get()).postinit_lock(); - match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) { - stream::UpSuccess | stream::UpDisconnected => (a, None), - stream::UpWoke(task) => (a, Some(task)), - } - } - } - Shared(ref p) => { - unsafe { (*p.get()).clone_chan(); } - return Sender::new(Shared(p.clone())); - } - Sync(..) => unreachable!(), - }; - - unsafe { - (*packet.get()).inherit_blocker(sleeper); - - let tmp = Sender::new(Shared(packet.clone())); - mem::swap(self.mut_inner(), tmp.mut_inner()); - } - Sender::new(Shared(packet)) - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Sender<T> { - fn drop(&mut self) { - match *unsafe { self.mut_inner() } { - Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, - Sync(..) => unreachable!(), - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// SyncSender -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> SyncSender<T> { - fn new(inner: Arc<Unsafe<sync::Packet<T>>>) -> SyncSender<T> { - SyncSender { inner: inner, marker: marker::NoShare } - } - - /// Sends a value on this synchronous channel. - /// - /// This function will *block* until space in the internal buffer becomes - /// available or a receiver is available to hand off the message to. - /// - /// Note that a successful send does *not* guarantee that the receiver will - /// ever see the data if there is a buffer on this channel. Messages may be - /// enqueued in the internal buffer for the receiver to receive at a later - /// time. If the buffer size is 0, however, it can be guaranteed that the - /// receiver has indeed received the data if this function returns success. - /// - /// # Failure - /// - /// Similarly to `Sender::send`, this function will fail if the - /// corresponding `Receiver` for this channel has disconnected. This - /// behavior is used to propagate failure among tasks. - /// - /// If failure is not desired, you can achieve the same semantics with the - /// `SyncSender::send_opt` method which will not fail if the receiver - /// disconnects. - pub fn send(&self, t: T) { - if self.send_opt(t).is_err() { - fail!("sending on a closed channel"); - } - } - - /// Send a value on a channel, returning it back if the receiver - /// disconnected - /// - /// This method will *block* to send the value `t` on the channel, but if - /// the value could not be sent due to the receiver disconnecting, the value - /// is returned back to the callee. This function is similar to `try_send`, - /// except that it will block if the channel is currently full. - /// - /// # Failure - /// - /// This function cannot fail. - pub fn send_opt(&self, t: T) -> Result<(), T> { - unsafe { (*self.inner.get()).send(t) } - } - - /// Attempts to send a value on this channel without blocking. - /// - /// This method differs from `send_opt` by returning immediately if the - /// channel's buffer is full or no receiver is waiting to acquire some - /// data. Compared with `send_opt`, this function has two failure cases - /// instead of one (one for disconnection, one for a full buffer). - /// - /// See `SyncSender::send` for notes about guarantees of whether the - /// receiver has received the data or not if this function is successful. - /// - /// # Failure - /// - /// This function cannot fail - pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { - unsafe { (*self.inner.get()).try_send(t) } - } -} - -impl<T: Send> Clone for SyncSender<T> { - fn clone(&self) -> SyncSender<T> { - unsafe { (*self.inner.get()).clone_chan(); } - return SyncSender::new(self.inner.clone()); - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for SyncSender<T> { - fn drop(&mut self) { - unsafe { (*self.inner.get()).drop_chan(); } - } -} - -//////////////////////////////////////////////////////////////////////////////// -// Receiver -//////////////////////////////////////////////////////////////////////////////// - -impl<T: Send> Receiver<T> { - fn new(inner: Flavor<T>) -> Receiver<T> { - Receiver { inner: Unsafe::new(inner), receives: Cell::new(0), marker: marker::NoShare } - } - - /// Blocks waiting for a value on this receiver - /// - /// This function will block if necessary to wait for a corresponding send - /// on the channel from its paired `Sender` structure. This receiver will - /// be woken up when data is ready, and the data will be returned. - /// - /// # Failure - /// - /// Similar to channels, this method will trigger a task failure if the - /// other end of the channel has hung up (been deallocated). The purpose of - /// this is to propagate failure among tasks. - /// - /// If failure is not desired, then there are two options: - /// - /// * If blocking is still desired, the `recv_opt` method will return `None` - /// when the other end hangs up - /// - /// * If blocking is not desired, then the `try_recv` method will attempt to - /// peek at a value on this receiver. - pub fn recv(&self) -> T { - match self.recv_opt() { - Ok(t) => t, - Err(()) => fail!("receiving on a closed channel"), - } - } - - /// Attempts to return a pending value on this receiver without blocking - /// - /// This method will never block the caller in order to wait for data to - /// become available. Instead, this will always return immediately with a - /// possible option of pending data on the channel. - /// - /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a receiver. - /// - /// This function cannot fail. - pub fn try_recv(&self) -> Result<T, TryRecvError> { - // If a thread is spinning in try_recv, we should take the opportunity - // to reschedule things occasionally. See notes above in scheduling on - // sends for why this doesn't always hit TLS, and also for why this uses - // `try_take` instead of `take`. - let cnt = self.receives.get() + 1; - self.receives.set(cnt); - if cnt % (RESCHED_FREQ as uint) == 0 { - let task: Option<Box<Task>> = Local::try_take(); - task.map(|t| t.maybe_yield()); - } - - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return Err(Empty), - Err(oneshot::Disconnected) => return Err(Disconnected), - Err(oneshot::Upgraded(rx)) => rx, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(stream::Empty) => return Err(Empty), - Err(stream::Disconnected) => return Err(Disconnected), - Err(stream::Upgraded(rx)) => rx, - } - } - Shared(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(shared::Empty) => return Err(Empty), - Err(shared::Disconnected) => return Err(Disconnected), - } - } - Sync(ref p) => { - match unsafe { (*p.get()).try_recv() } { - Ok(t) => return Ok(t), - Err(sync::Empty) => return Err(Empty), - Err(sync::Disconnected) => return Err(Disconnected), - } - } - }; - unsafe { - mem::swap(self.mut_inner(), - new_port.mut_inner()); - } - } - } - - /// Attempt to wait for a value on this receiver, but does not fail if the - /// corresponding channel has hung up. - /// - /// This implementation of iterators for ports will always block if there is - /// not data available on the receiver, but it will not fail in the case - /// that the channel has been deallocated. - /// - /// In other words, this function has the same semantics as the `recv` - /// method except for the failure aspect. - /// - /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of - /// the value found on the receiver is returned. - pub fn recv_opt(&self) -> Result<T, ()> { - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(oneshot::Empty) => return unreachable!(), - Err(oneshot::Disconnected) => return Err(()), - Err(oneshot::Upgraded(rx)) => rx, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(stream::Empty) => return unreachable!(), - Err(stream::Disconnected) => return Err(()), - Err(stream::Upgraded(rx)) => rx, - } - } - Shared(ref p) => { - match unsafe { (*p.get()).recv() } { - Ok(t) => return Ok(t), - Err(shared::Empty) => return unreachable!(), - Err(shared::Disconnected) => return Err(()), - } - } - Sync(ref p) => return unsafe { (*p.get()).recv() } - }; - unsafe { - mem::swap(self.mut_inner(), new_port.mut_inner()); - } - } - } - - /// Returns an iterator which will block waiting for messages, but never - /// `fail!`. It will return `None` when the channel has hung up. - pub fn iter<'a>(&'a self) -> Messages<'a, T> { - Messages { rx: self } - } -} - -impl<T: Send> select::Packet for Receiver<T> { - fn can_recv(&self) -> bool { - loop { - let new_port = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).can_recv() } { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Stream(ref p) => { - match unsafe { (*p.get()).can_recv() } { - Ok(ret) => return ret, - Err(upgrade) => upgrade, - } - } - Shared(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - Sync(ref p) => { - return unsafe { (*p.get()).can_recv() }; - } - }; - unsafe { - mem::swap(self.mut_inner(), - new_port.mut_inner()); - } - } - } - - fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{ - loop { - let (t, new_port) = match *unsafe { self.inner() } { - Oneshot(ref p) => { - match unsafe { (*p.get()).start_selection(task) } { - oneshot::SelSuccess => return Ok(()), - oneshot::SelCanceled(task) => return Err(task), - oneshot::SelUpgraded(t, rx) => (t, rx), - } - } - Stream(ref p) => { - match unsafe { (*p.get()).start_selection(task) } { - stream::SelSuccess => return Ok(()), - stream::SelCanceled(task) => return Err(task), - stream::SelUpgraded(t, rx) => (t, rx), - } - } - Shared(ref p) => { - return unsafe { (*p.get()).start_selection(task) }; - } - Sync(ref p) => { - return unsafe { (*p.get()).start_selection(task) }; - } - }; - task = t; - unsafe { - mem::swap(self.mut_inner(), - new_port.mut_inner()); - } - } - } - - fn abort_selection(&self) -> bool { - let mut was_upgrade = false; - loop { - let result = match *unsafe { self.inner() } { - Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, - Stream(ref p) => unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Shared(ref p) => return unsafe { - (*p.get()).abort_selection(was_upgrade) - }, - Sync(ref p) => return unsafe { - (*p.get()).abort_selection() - }, - }; - let new_port = match result { Ok(b) => return b, Err(p) => p }; - was_upgrade = true; - unsafe { - mem::swap(self.mut_inner(), - new_port.mut_inner()); - } - } - } -} - -impl<'a, T: Send> Iterator<T> for Messages<'a, T> { - fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Receiver<T> { - fn drop(&mut self) { - match *unsafe { self.mut_inner() } { - Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, - Sync(ref mut p) => unsafe { (*p.get()).drop_port(); }, - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - - use native; - use os; - use super::*; - - pub fn stress_factor() -> uint { - match os::getenv("RUST_TEST_STRESS") { - Some(val) => from_str::<uint>(val.as_slice()).unwrap(), - None => 1, - } - } - - test!(fn smoke() { - let (tx, rx) = channel(); - tx.send(1); - assert_eq!(rx.recv(), 1); - }) - - test!(fn drop_full() { - let (tx, _rx) = channel(); - tx.send(box 1); - }) - - test!(fn drop_full_shared() { - let (tx, _rx) = channel(); - drop(tx.clone()); - drop(tx.clone()); - tx.send(box 1); - }) - - test!(fn smoke_shared() { - let (tx, rx) = channel(); - tx.send(1); - assert_eq!(rx.recv(), 1); - let tx = tx.clone(); - tx.send(1); - assert_eq!(rx.recv(), 1); - }) - - test!(fn smoke_threads() { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(1); - }); - assert_eq!(rx.recv(), 1); - }) - - test!(fn smoke_port_gone() { - let (tx, rx) = channel(); - drop(rx); - tx.send(1); - } #[should_fail]) - - test!(fn smoke_shared_port_gone() { - let (tx, rx) = channel(); - drop(rx); - tx.send(1); - } #[should_fail]) - - test!(fn smoke_shared_port_gone2() { - let (tx, rx) = channel(); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - tx2.send(1); - } #[should_fail]) - - test!(fn port_gone_concurrent() { - let (tx, rx) = channel(); - spawn(proc() { - rx.recv(); - }); - loop { tx.send(1) } - } #[should_fail]) - - test!(fn port_gone_concurrent_shared() { - let (tx, rx) = channel(); - let tx2 = tx.clone(); - spawn(proc() { - rx.recv(); - }); - loop { - tx.send(1); - tx2.send(1); - } - } #[should_fail]) - - test!(fn smoke_chan_gone() { - let (tx, rx) = channel::<int>(); - drop(tx); - rx.recv(); - } #[should_fail]) - - test!(fn smoke_chan_gone_shared() { - let (tx, rx) = channel::<()>(); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - rx.recv(); - } #[should_fail]) - - test!(fn chan_gone_concurrent() { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(1); - tx.send(1); - }); - loop { rx.recv(); } - } #[should_fail]) - - test!(fn stress() { - let (tx, rx) = channel(); - spawn(proc() { - for _ in range(0, 10000) { tx.send(1); } - }); - for _ in range(0, 10000) { - assert_eq!(rx.recv(), 1); - } - }) - - test!(fn stress_shared() { - static AMT: uint = 10000; - static NTHREADS: uint = 8; - let (tx, rx) = channel::<int>(); - let (dtx, drx) = channel::<()>(); - - spawn(proc() { - for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv(), 1); - } - match rx.try_recv() { - Ok(..) => fail!(), - _ => {} - } - dtx.send(()); - }); - - for _ in range(0, NTHREADS) { - let tx = tx.clone(); - spawn(proc() { - for _ in range(0, AMT) { tx.send(1); } - }); - } - drop(tx); - drx.recv(); - }) - - #[test] - fn send_from_outside_runtime() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - let tx4 = tx3.clone(); - spawn(proc() { - tx1.send(()); - for _ in range(0, 40) { - assert_eq!(rx2.recv(), 1); - } - tx3.send(()); - }); - rx1.recv(); - native::task::spawn(proc() { - for _ in range(0, 40) { - tx2.send(1); - } - tx4.send(()); - }); - rx3.recv(); - rx3.recv(); - } - - #[test] - fn recv_from_outside_runtime() { - let (tx, rx) = channel::<int>(); - let (dtx, drx) = channel(); - native::task::spawn(proc() { - for _ in range(0, 40) { - assert_eq!(rx.recv(), 1); - } - dtx.send(()); - }); - for _ in range(0, 40) { - tx.send(1); - } - drx.recv(); - } - - #[test] - fn no_runtime() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - let tx4 = tx3.clone(); - native::task::spawn(proc() { - assert_eq!(rx1.recv(), 1); - tx2.send(2); - tx4.send(()); - }); - native::task::spawn(proc() { - tx1.send(1); - assert_eq!(rx2.recv(), 2); - tx3.send(()); - }); - rx3.recv(); - rx3.recv(); - } - - test!(fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = channel::<int>(); - drop(rx); - }) - - test!(fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = channel::<int>(); - drop(tx); - }) - - test!(fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = channel::<Box<int>>(); - drop(rx); - tx.send(box 0); - } #[should_fail]) - - test!(fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will fail - let res = task::try(proc() { - let (tx, rx) = channel::<int>(); - drop(tx); - rx.recv(); - }); - // What is our res? - assert!(res.is_err()); - }) - - test!(fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = channel::<Box<int>>(); - tx.send(box 10); - assert!(rx.recv() == box 10); - }) - - test!(fn oneshot_single_thread_try_send_open() { - let (tx, rx) = channel::<int>(); - assert!(tx.send_opt(10).is_ok()); - assert!(rx.recv() == 10); - }) - - test!(fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = channel::<int>(); - drop(rx); - assert!(tx.send_opt(10).is_err()); - }) - - test!(fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = channel::<int>(); - tx.send(10); - assert!(rx.recv_opt() == Ok(10)); - }) - - test!(fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = channel::<int>(); - drop(tx); - assert!(rx.recv_opt() == Err(())); - }) - - test!(fn oneshot_single_thread_peek_data() { - let (tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(Empty)) - tx.send(10); - assert_eq!(rx.try_recv(), Ok(10)); - }) - - test!(fn oneshot_single_thread_peek_close() { - let (tx, rx) = channel::<int>(); - drop(tx); - assert_eq!(rx.try_recv(), Err(Disconnected)); - assert_eq!(rx.try_recv(), Err(Disconnected)); - }) - - test!(fn oneshot_single_thread_peek_open() { - let (_tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(Empty)); - }) - - test!(fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = channel::<Box<int>>(); - spawn(proc() { - assert!(rx.recv() == box 10); - }); - - tx.send(box 10); - }) - - test!(fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = channel::<Box<int>>(); - spawn(proc() { - drop(tx); - }); - let res = task::try(proc() { - assert!(rx.recv() == box 10); - }); - assert!(res.is_err()); - }) - - test!(fn oneshot_multi_thread_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - spawn(proc() { - drop(rx); - }); - drop(tx); - } - }) - - test!(fn oneshot_multi_thread_send_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - spawn(proc() { - drop(rx); - }); - let _ = task::try(proc() { - tx.send(1); - }); - } - }) - - test!(fn oneshot_multi_thread_recv_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel::<int>(); - spawn(proc() { - let res = task::try(proc() { - rx.recv(); - }); - assert!(res.is_err()); - }); - spawn(proc() { - spawn(proc() { - drop(tx); - }); - }); - } - }) - - test!(fn oneshot_multi_thread_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel(); - spawn(proc() { - tx.send(box 10); - }); - spawn(proc() { - assert!(rx.recv() == box 10); - }); - } - }) - - test!(fn stream_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = channel(); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: Sender<Box<int>>, i: int) { - if i == 10 { return } - - spawn(proc() { - tx.send(box i); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver<Box<int>>, i: int) { - if i == 10 { return } - - spawn(proc() { - assert!(rx.recv() == box i); - recv(rx, i + 1); - }); - } - } - }) - - test!(fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = channel(); - for _ in range(0, 10000) { tx.send(()); } - for _ in range(0, 10000) { rx.recv(); } - }) - - test!(fn shared_chan_stress() { - let (tx, rx) = channel(); - let total = stress_factor() + 100; - for _ in range(0, total) { - let tx = tx.clone(); - spawn(proc() { - tx.send(()); - }); - } - - for _ in range(0, total) { - rx.recv(); - } - }) - - test!(fn test_nested_recv_iter() { - let (tx, rx) = channel::<int>(); - let (total_tx, total_rx) = channel::<int>(); - - spawn(proc() { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc); - }); - - tx.send(3); - tx.send(1); - tx.send(2); - drop(tx); - assert_eq!(total_rx.recv(), 6); - }) - - test!(fn test_recv_iter_break() { - let (tx, rx) = channel::<int>(); - let (count_tx, count_rx) = channel(); - - spawn(proc() { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count); - }); - - tx.send(2); - tx.send(2); - tx.send(2); - let _ = tx.send_opt(2); - drop(tx); - assert_eq!(count_rx.recv(), 4); - }) - - test!(fn try_recv_states() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - spawn(proc() { - rx2.recv(); - tx1.send(1); - tx3.send(()); - rx2.recv(); - drop(tx1); - tx3.send(()); - }); - - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Err(Disconnected)); - }) - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - test!(fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = channel(); - let (tx2, rx2) = channel(); - spawn(proc() { - rx.recv(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()); - }); - // make sure the other task has gone to sleep - for _ in range(0, 5000) { task::deschedule(); } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()); - - // wait for the child task to exit before we exit - rx2.recv(); - }) - - test!(fn sends_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let t = Thread::start(proc() { - for _ in range(0, 1000) { - tx.send(()); - } - }); - for _ in range(0, 1000) { - rx.recv(); - } - t.join(); - }) - - test!(fn try_recvs_off_the_runtime() { - use rt::thread::Thread; - - let (tx, rx) = channel(); - let (cdone, pdone) = channel(); - let t = Thread::start(proc() { - let mut hits = 0; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - }) -} - -#[cfg(test)] -mod sync_tests { - use prelude::*; - use os; - - pub fn stress_factor() -> uint { - match os::getenv("RUST_TEST_STRESS") { - Some(val) => from_str::<uint>(val.as_slice()).unwrap(), - None => 1, - } - } - - test!(fn smoke() { - let (tx, rx) = sync_channel(1); - tx.send(1); - assert_eq!(rx.recv(), 1); - }) - - test!(fn drop_full() { - let (tx, _rx) = sync_channel(1); - tx.send(box 1); - }) - - test!(fn smoke_shared() { - let (tx, rx) = sync_channel(1); - tx.send(1); - assert_eq!(rx.recv(), 1); - let tx = tx.clone(); - tx.send(1); - assert_eq!(rx.recv(), 1); - }) - - test!(fn smoke_threads() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - tx.send(1); - }); - assert_eq!(rx.recv(), 1); - }) - - test!(fn smoke_port_gone() { - let (tx, rx) = sync_channel(0); - drop(rx); - tx.send(1); - } #[should_fail]) - - test!(fn smoke_shared_port_gone2() { - let (tx, rx) = sync_channel(0); - drop(rx); - let tx2 = tx.clone(); - drop(tx); - tx2.send(1); - } #[should_fail]) - - test!(fn port_gone_concurrent() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - rx.recv(); - }); - loop { tx.send(1) } - } #[should_fail]) - - test!(fn port_gone_concurrent_shared() { - let (tx, rx) = sync_channel(0); - let tx2 = tx.clone(); - spawn(proc() { - rx.recv(); - }); - loop { - tx.send(1); - tx2.send(1); - } - } #[should_fail]) - - test!(fn smoke_chan_gone() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - rx.recv(); - } #[should_fail]) - - test!(fn smoke_chan_gone_shared() { - let (tx, rx) = sync_channel::<()>(0); - let tx2 = tx.clone(); - drop(tx); - drop(tx2); - rx.recv(); - } #[should_fail]) - - test!(fn chan_gone_concurrent() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - tx.send(1); - tx.send(1); - }); - loop { rx.recv(); } - } #[should_fail]) - - test!(fn stress() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - for _ in range(0, 10000) { tx.send(1); } - }); - for _ in range(0, 10000) { - assert_eq!(rx.recv(), 1); - } - }) - - test!(fn stress_shared() { - static AMT: uint = 1000; - static NTHREADS: uint = 8; - let (tx, rx) = sync_channel::<int>(0); - let (dtx, drx) = sync_channel::<()>(0); - - spawn(proc() { - for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv(), 1); - } - match rx.try_recv() { - Ok(..) => fail!(), - _ => {} - } - dtx.send(()); - }); - - for _ in range(0, NTHREADS) { - let tx = tx.clone(); - spawn(proc() { - for _ in range(0, AMT) { tx.send(1); } - }); - } - drop(tx); - drx.recv(); - }) - - test!(fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - let (_tx, rx) = sync_channel::<int>(0); - drop(rx); - }) - - test!(fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - let (tx, _rx) = sync_channel::<int>(0); - drop(tx); - }) - - test!(fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - let (tx, rx) = sync_channel::<Box<int>>(0); - drop(rx); - tx.send(box 0); - } #[should_fail]) - - test!(fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will fail - let res = task::try(proc() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - rx.recv(); - }); - // What is our res? - assert!(res.is_err()); - }) - - test!(fn oneshot_single_thread_send_then_recv() { - let (tx, rx) = sync_channel::<Box<int>>(1); - tx.send(box 10); - assert!(rx.recv() == box 10); - }) - - test!(fn oneshot_single_thread_try_send_open() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv() == 10); - }) - - test!(fn oneshot_single_thread_try_send_closed() { - let (tx, rx) = sync_channel::<int>(0); - drop(rx); - assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); - }) - - test!(fn oneshot_single_thread_try_send_closed2() { - let (tx, _rx) = sync_channel::<int>(0); - assert_eq!(tx.try_send(10), Err(Full(10))); - }) - - test!(fn oneshot_single_thread_try_recv_open() { - let (tx, rx) = sync_channel::<int>(1); - tx.send(10); - assert!(rx.recv_opt() == Ok(10)); - }) - - test!(fn oneshot_single_thread_try_recv_closed() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - assert!(rx.recv_opt() == Err(())); - }) - - test!(fn oneshot_single_thread_peek_data() { - let (tx, rx) = sync_channel::<int>(1); - assert_eq!(rx.try_recv(), Err(Empty)) - tx.send(10); - assert_eq!(rx.try_recv(), Ok(10)); - }) - - test!(fn oneshot_single_thread_peek_close() { - let (tx, rx) = sync_channel::<int>(0); - drop(tx); - assert_eq!(rx.try_recv(), Err(Disconnected)); - assert_eq!(rx.try_recv(), Err(Disconnected)); - }) - - test!(fn oneshot_single_thread_peek_open() { - let (_tx, rx) = sync_channel::<int>(0); - assert_eq!(rx.try_recv(), Err(Empty)); - }) - - test!(fn oneshot_multi_task_recv_then_send() { - let (tx, rx) = sync_channel::<Box<int>>(0); - spawn(proc() { - assert!(rx.recv() == box 10); - }); - - tx.send(box 10); - }) - - test!(fn oneshot_multi_task_recv_then_close() { - let (tx, rx) = sync_channel::<Box<int>>(0); - spawn(proc() { - drop(tx); - }); - let res = task::try(proc() { - assert!(rx.recv() == box 10); - }); - assert!(res.is_err()); - }) - - test!(fn oneshot_multi_thread_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - spawn(proc() { - drop(rx); - }); - drop(tx); - } - }) - - test!(fn oneshot_multi_thread_send_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - spawn(proc() { - drop(rx); - }); - let _ = task::try(proc() { - tx.send(1); - }); - } - }) - - test!(fn oneshot_multi_thread_recv_close_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel::<int>(0); - spawn(proc() { - let res = task::try(proc() { - rx.recv(); - }); - assert!(res.is_err()); - }); - spawn(proc() { - spawn(proc() { - drop(tx); - }); - }); - } - }) - - test!(fn oneshot_multi_thread_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel(0); - spawn(proc() { - tx.send(box 10); - }); - spawn(proc() { - assert!(rx.recv() == box 10); - }); - } - }) - - test!(fn stream_send_recv_stress() { - for _ in range(0, stress_factor()) { - let (tx, rx) = sync_channel(0); - - send(tx, 0); - recv(rx, 0); - - fn send(tx: SyncSender<Box<int>>, i: int) { - if i == 10 { return } - - spawn(proc() { - tx.send(box i); - send(tx, i + 1); - }); - } - - fn recv(rx: Receiver<Box<int>>, i: int) { - if i == 10 { return } - - spawn(proc() { - assert!(rx.recv() == box i); - recv(rx, i + 1); - }); - } - } - }) - - test!(fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - let (tx, rx) = sync_channel(10000); - for _ in range(0, 10000) { tx.send(()); } - for _ in range(0, 10000) { rx.recv(); } - }) - - test!(fn shared_chan_stress() { - let (tx, rx) = sync_channel(0); - let total = stress_factor() + 100; - for _ in range(0, total) { - let tx = tx.clone(); - spawn(proc() { - tx.send(()); - }); - } - - for _ in range(0, total) { - rx.recv(); - } - }) - - test!(fn test_nested_recv_iter() { - let (tx, rx) = sync_channel::<int>(0); - let (total_tx, total_rx) = sync_channel::<int>(0); - - spawn(proc() { - let mut acc = 0; - for x in rx.iter() { - acc += x; - } - total_tx.send(acc); - }); - - tx.send(3); - tx.send(1); - tx.send(2); - drop(tx); - assert_eq!(total_rx.recv(), 6); - }) - - test!(fn test_recv_iter_break() { - let (tx, rx) = sync_channel::<int>(0); - let (count_tx, count_rx) = sync_channel(0); - - spawn(proc() { - let mut count = 0; - for x in rx.iter() { - if count >= 3 { - break; - } else { - count += x; - } - } - count_tx.send(count); - }); - - tx.send(2); - tx.send(2); - tx.send(2); - let _ = tx.try_send(2); - drop(tx); - assert_eq!(count_rx.recv(), 4); - }) - - test!(fn try_recv_states() { - let (tx1, rx1) = sync_channel::<int>(1); - let (tx2, rx2) = sync_channel::<()>(1); - let (tx3, rx3) = sync_channel::<()>(1); - spawn(proc() { - rx2.recv(); - tx1.send(1); - tx3.send(()); - rx2.recv(); - drop(tx1); - tx3.send(()); - }); - - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(Empty)); - tx2.send(()); - rx3.recv(); - assert_eq!(rx1.try_recv(), Err(Disconnected)); - }) - - // This bug used to end up in a livelock inside of the Receiver destructor - // because the internal state of the Shared packet was corrupted - test!(fn destroy_upgraded_shared_port_when_sender_still_active() { - let (tx, rx) = sync_channel(0); - let (tx2, rx2) = sync_channel(0); - spawn(proc() { - rx.recv(); // wait on a oneshot - drop(rx); // destroy a shared - tx2.send(()); - }); - // make sure the other task has gone to sleep - for _ in range(0, 5000) { task::deschedule(); } - - // upgrade to a shared chan and send a message - let t = tx.clone(); - drop(tx); - t.send(()); - - // wait for the child task to exit before we exit - rx2.recv(); - }) - - test!(fn try_recvs_off_the_runtime() { - use std::rt::thread::Thread; - - let (tx, rx) = sync_channel(0); - let (cdone, pdone) = channel(); - let t = Thread::start(proc() { - let mut hits = 0; - while hits < 10 { - match rx.try_recv() { - Ok(()) => { hits += 1; } - Err(Empty) => { Thread::yield_now(); } - Err(Disconnected) => return, - } - } - cdone.send(()); - }); - for _ in range(0, 10) { - tx.send(()); - } - t.join(); - pdone.recv(); - }) - - test!(fn send_opt1() { - let (tx, rx) = sync_channel(0); - spawn(proc() { rx.recv(); }); - assert_eq!(tx.send_opt(1), Ok(())); - }) - - test!(fn send_opt2() { - let (tx, rx) = sync_channel(0); - spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Err(1)); - }) - - test!(fn send_opt3() { - let (tx, rx) = sync_channel(1); - assert_eq!(tx.send_opt(1), Ok(())); - spawn(proc() { drop(rx); }); - assert_eq!(tx.send_opt(1), Err(1)); - }) - - test!(fn send_opt4() { - let (tx, rx) = sync_channel(0); - let tx2 = tx.clone(); - let (done, donerx) = channel(); - let done2 = done.clone(); - spawn(proc() { - assert_eq!(tx.send_opt(1), Err(1)); - done.send(()); - }); - spawn(proc() { - assert_eq!(tx2.send_opt(2), Err(2)); - done2.send(()); - }); - drop(rx); - donerx.recv(); - donerx.recv(); - }) - - test!(fn try_send1() { - let (tx, _rx) = sync_channel(0); - assert_eq!(tx.try_send(1), Err(Full(1))); - }) - - test!(fn try_send2() { - let (tx, _rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(Full(1))); - }) - - test!(fn try_send3() { - let (tx, rx) = sync_channel(1); - assert_eq!(tx.try_send(1), Ok(())); - drop(rx); - assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); - }) - - test!(fn try_send4() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - for _ in range(0, 1000) { task::deschedule(); } - assert_eq!(tx.try_send(1), Ok(())); - }); - assert_eq!(rx.recv(), 1); - } #[ignore(reason = "flaky on libnative")]) -} diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs deleted file mode 100644 index f9e8fd1e534..00000000000 --- a/src/libstd/comm/oneshot.rs +++ /dev/null @@ -1,373 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Oneshot channels/ports -/// -/// This is the initial flavor of channels/ports used for comm module. This is -/// an optimization for the one-use case of a channel. The major optimization of -/// this type is to have one and exactly one allocation when the chan/port pair -/// is created. -/// -/// Another possible optimization would be to not use an Arc box because -/// in theory we know when the shared packet can be deallocated (no real need -/// for the atomic reference counting), but I was having trouble how to destroy -/// the data early in a drop of a Port. -/// -/// # Implementation -/// -/// Oneshots are implemented around one atomic uint variable. This variable -/// indicates both the state of the port/chan but also contains any tasks -/// blocked on the port. All atomic operations happen on this one word. -/// -/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect -/// on behalf of the channel side of things (it can be mentally thought of as -/// consuming the port). This upgrade is then also stored in the shared packet. -/// The one caveat to consider is that when a port sees a disconnected channel -/// it must check for data because there is no "data plus upgrade" state. - -use comm::Receiver; -use kinds::Send; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use result::{Result, Ok, Err}; -use rt::local::Local; -use rt::task::{Task, BlockedTask}; -use sync::atomics; - -// Various states you can find a port in. -static EMPTY: uint = 0; -static DATA: uint = 1; -static DISCONNECTED: uint = 2; - -pub struct Packet<T> { - // Internal state of the chan/port pair (stores the blocked task as well) - state: atomics::AtomicUint, - // One-shot data slot location - data: Option<T>, - // when used for the second time, a oneshot channel must be upgraded, and - // this contains the slot for the upgrade - upgrade: MyUpgrade<T>, -} - -pub enum Failure<T> { - Empty, - Disconnected, - Upgraded(Receiver<T>), -} - -pub enum UpgradeResult { - UpSuccess, - UpDisconnected, - UpWoke(BlockedTask), -} - -pub enum SelectionResult<T> { - SelCanceled(BlockedTask), - SelUpgraded(BlockedTask, Receiver<T>), - SelSuccess, -} - -enum MyUpgrade<T> { - NothingSent, - SendUsed, - GoUp(Receiver<T>), -} - -impl<T: Send> Packet<T> { - pub fn new() -> Packet<T> { - Packet { - data: None, - upgrade: NothingSent, - state: atomics::AtomicUint::new(EMPTY), - } - } - - pub fn send(&mut self, t: T) -> Result<(), T> { - // Sanity check - match self.upgrade { - NothingSent => {} - _ => fail!("sending on a oneshot that's already sent on "), - } - assert!(self.data.is_none()); - self.data = Some(t); - self.upgrade = SendUsed; - - match self.state.swap(DATA, atomics::SeqCst) { - // Sent the data, no one was waiting - EMPTY => Ok(()), - - // Couldn't send the data, the port hung up first. Return the data - // back up the stack. - DISCONNECTED => { - Err(self.data.take_unwrap()) - } - - // Not possible, these are one-use channels - DATA => unreachable!(), - - // Anything else means that there was a task waiting on the other - // end. We leave the 'DATA' state inside so it'll pick it up on the - // other end. - n => unsafe { - let t = BlockedTask::cast_from_uint(n); - t.wake().map(|t| t.reawaken()); - Ok(()) - } - } - } - - // Just tests whether this channel has been sent on or not, this is only - // safe to use from the sender. - pub fn sent(&self) -> bool { - match self.upgrade { - NothingSent => false, - _ => true, - } - } - - pub fn recv(&mut self) -> Result<T, Failure<T>> { - // Attempt to not block the task (it's a little expensive). If it looks - // like we're not empty, then immediately go through to `try_recv`. - if self.state.load(atomics::SeqCst) == EMPTY { - let t: Box<Task> = Local::take(); - t.deschedule(1, |task| { - let n = unsafe { task.cast_to_uint() }; - match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) { - // Nothing on the channel, we legitimately block - EMPTY => Ok(()), - - // If there's data or it's a disconnected channel, then we - // failed the cmpxchg, so we just wake ourselves back up - DATA | DISCONNECTED => { - unsafe { Err(BlockedTask::cast_from_uint(n)) } - } - - // Only one thread is allowed to sleep on this port - _ => unreachable!() - } - }); - } - - self.try_recv() - } - - pub fn try_recv(&mut self) -> Result<T, Failure<T>> { - match self.state.load(atomics::SeqCst) { - EMPTY => Err(Empty), - - // We saw some data on the channel, but the channel can be used - // again to send us an upgrade. As a result, we need to re-insert - // into the channel that there's no data available (otherwise we'll - // just see DATA next time). This is done as a cmpxchg because if - // the state changes under our feet we'd rather just see that state - // change. - DATA => { - self.state.compare_and_swap(DATA, EMPTY, atomics::SeqCst); - match self.data.take() { - Some(data) => Ok(data), - None => unreachable!(), - } - } - - // There's no guarantee that we receive before an upgrade happens, - // and an upgrade flags the channel as disconnected, so when we see - // this we first need to check if there's data available and *then* - // we go through and process the upgrade. - DISCONNECTED => { - match self.data.take() { - Some(data) => Ok(data), - None => { - match mem::replace(&mut self.upgrade, SendUsed) { - SendUsed | NothingSent => Err(Disconnected), - GoUp(upgrade) => Err(Upgraded(upgrade)) - } - } - } - } - _ => unreachable!() - } - } - - // Returns whether the upgrade was completed. If the upgrade wasn't - // completed, then the port couldn't get sent to the other half (it will - // never receive it). - pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { - let prev = match self.upgrade { - NothingSent => NothingSent, - SendUsed => SendUsed, - _ => fail!("upgrading again"), - }; - self.upgrade = GoUp(up); - - match self.state.swap(DISCONNECTED, atomics::SeqCst) { - // If the channel is empty or has data on it, then we're good to go. - // Senders will check the data before the upgrade (in case we - // plastered over the DATA state). - DATA | EMPTY => UpSuccess, - - // If the other end is already disconnected, then we failed the - // upgrade. Be sure to trash the port we were given. - DISCONNECTED => { self.upgrade = prev; UpDisconnected } - - // If someone's waiting, we gotta wake them up - n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) }) - } - } - - pub fn drop_chan(&mut self) { - match self.state.swap(DISCONNECTED, atomics::SeqCst) { - DATA | DISCONNECTED | EMPTY => {} - - // If someone's waiting, we gotta wake them up - n => unsafe { - let t = BlockedTask::cast_from_uint(n); - t.wake().map(|t| t.reawaken()); - } - } - } - - pub fn drop_port(&mut self) { - match self.state.swap(DISCONNECTED, atomics::SeqCst) { - // An empty channel has nothing to do, and a remotely disconnected - // channel also has nothing to do b/c we're about to run the drop - // glue - DISCONNECTED | EMPTY => {} - - // There's data on the channel, so make sure we destroy it promptly. - // This is why not using an arc is a little difficult (need the box - // to stay valid while we take the data). - DATA => { self.data.take_unwrap(); } - - // We're the only ones that can block on this port - _ => unreachable!() - } - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { - match self.state.load(atomics::SeqCst) { - EMPTY => Ok(false), // Welp, we tried - DATA => Ok(true), // we have some un-acquired data - DISCONNECTED if self.data.is_some() => Ok(true), // we have data - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => Err(upgrade), - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { self.upgrade = up; Ok(true) } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - - // Attempts to start selection on this port. This can either succeed, fail - // because there is data, or fail because there is an upgrade pending. - pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> { - let n = unsafe { task.cast_to_uint() }; - match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) { - EMPTY => SelSuccess, - DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }), - DISCONNECTED if self.data.is_some() => { - SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) - } - DISCONNECTED => { - match mem::replace(&mut self.upgrade, SendUsed) { - // The other end sent us an upgrade, so we need to - // propagate upwards whether the upgrade can receive - // data - GoUp(upgrade) => { - SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) }, - upgrade) - } - - // If the other end disconnected without sending an - // upgrade, then we have data to receive (the channel is - // disconnected). - up => { - self.upgrade = up; - SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) - } - } - } - _ => unreachable!(), // we're the "one blocker" - } - } - - // Remove a previous selecting task from this port. This ensures that the - // blocked task will no longer be visible to any other threads. - // - // The return value indicates whether there's data on this port. - pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { - let state = match self.state.load(atomics::SeqCst) { - // Each of these states means that no further activity will happen - // with regard to abortion selection - s @ EMPTY | - s @ DATA | - s @ DISCONNECTED => s, - - // If we've got a blocked task, then use an atomic to gain ownership - // of it (may fail) - n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst) - }; - - // Now that we've got ownership of our state, figure out what to do - // about it. - match state { - EMPTY => unreachable!(), - // our task used for select was stolen - DATA => Ok(true), - - // If the other end has hung up, then we have complete ownership - // of the port. First, check if there was data waiting for us. This - // is possible if the other end sent something and then hung up. - // - // We then need to check to see if there was an upgrade requested, - // and if so, the upgraded port needs to have its selection aborted. - DISCONNECTED => { - if self.data.is_some() { - Ok(true) - } else { - match mem::replace(&mut self.upgrade, SendUsed) { - GoUp(port) => Err(port), - _ => Ok(true), - } - } - } - - // We woke ourselves up from select. Assert that the task should be - // trashed and returne that we don't have any data. - n => { - let t = unsafe { BlockedTask::cast_from_uint(n) }; - t.trash(); - Ok(false) - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED); - } -} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs deleted file mode 100644 index ed884647fb6..00000000000 --- a/src/libstd/comm/select.rs +++ /dev/null @@ -1,688 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Selection over an array of receivers -//! -//! This module contains the implementation machinery necessary for selecting -//! over a number of receivers. One large goal of this module is to provide an -//! efficient interface to selecting over any receiver of any type. -//! -//! This is achieved through an architecture of a "receiver set" in which -//! receivers are added to a set and then the entire set is waited on at once. -//! The set can be waited on multiple times to prevent re-adding each receiver -//! to the set. -//! -//! Usage of this module is currently encouraged to go through the use of the -//! `select!` macro. This macro allows naturally binding of variables to the -//! received values of receivers in a much more natural syntax then usage of the -//! `Select` structure directly. -//! -//! # Example -//! -//! ```rust -//! let (tx1, rx1) = channel(); -//! let (tx2, rx2) = channel(); -//! -//! tx1.send(1); -//! tx2.send(2); -//! -//! select! { -//! val = rx1.recv() => { -//! assert_eq!(val, 1); -//! }, -//! val = rx2.recv() => { -//! assert_eq!(val, 2); -//! } -//! } -//! ``` - -#![allow(dead_code)] - -use cell::Cell; -use iter::Iterator; -use kinds::Send; -use kinds::marker; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use ptr::RawPtr; -use result::{Ok, Err, Result}; -use rt::local::Local; -use rt::task::{Task, BlockedTask}; -use super::Receiver; -use uint; - -/// The "receiver set" of the select interface. This structure is used to manage -/// a set of receivers which are being selected over. -pub struct Select { - head: *mut Handle<'static, ()>, - tail: *mut Handle<'static, ()>, - next_id: Cell<uint>, - marker1: marker::NoSend, -} - -/// A handle to a receiver which is currently a member of a `Select` set of -/// receivers. This handle is used to keep the receiver in the set as well as -/// interact with the underlying receiver. -pub struct Handle<'rx, T> { - /// The ID of this handle, used to compare against the return value of - /// `Select::wait()` - id: uint, - selector: &'rx Select, - next: *mut Handle<'static, ()>, - prev: *mut Handle<'static, ()>, - added: bool, - packet: &'rx Packet, - - // due to our fun transmutes, we be sure to place this at the end. (nothing - // previous relies on T) - rx: &'rx Receiver<T>, -} - -struct Packets { cur: *mut Handle<'static, ()> } - -#[doc(hidden)] -pub trait Packet { - fn can_recv(&self) -> bool; - fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>; - fn abort_selection(&self) -> bool; -} - -impl Select { - /// Creates a new selection structure. This set is initially empty and - /// `wait` will fail!() if called. - /// - /// Usage of this struct directly can sometimes be burdensome, and usage is - /// rather much easier through the `select!` macro. - pub fn new() -> Select { - Select { - marker1: marker::NoSend, - head: 0 as *mut Handle<'static, ()>, - tail: 0 as *mut Handle<'static, ()>, - next_id: Cell::new(1), - } - } - - /// Creates a new handle into this receiver set for a new receiver. Note - /// that this does *not* add the receiver to the receiver set, for that you - /// must call the `add` method on the handle itself. - pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> { - let id = self.next_id.get(); - self.next_id.set(id + 1); - Handle { - id: id, - selector: self, - next: 0 as *mut Handle<'static, ()>, - prev: 0 as *mut Handle<'static, ()>, - added: false, - rx: rx, - packet: rx, - } - } - - /// Waits for an event on this receiver set. The returned value is *not* an - /// index, but rather an id. This id can be queried against any active - /// `Handle` structures (each one has an `id` method). The handle with - /// the matching `id` will have some sort of event available on it. The - /// event could either be that data is available or the corresponding - /// channel has been closed. - pub fn wait(&self) -> uint { - self.wait2(false) - } - - /// Helper method for skipping the preflight checks during testing - fn wait2(&self, do_preflight_checks: bool) -> uint { - // Note that this is currently an inefficient implementation. We in - // theory have knowledge about all receivers in the set ahead of time, - // so this method shouldn't really have to iterate over all of them yet - // again. The idea with this "receiver set" interface is to get the - // interface right this time around, and later this implementation can - // be optimized. - // - // This implementation can be summarized by: - // - // fn select(receivers) { - // if any receiver ready { return ready index } - // deschedule { - // block on all receivers - // } - // unblock on all receivers - // return ready index - // } - // - // Most notably, the iterations over all of the receivers shouldn't be - // necessary. - unsafe { - let mut amt = 0; - for p in self.iter() { - amt += 1; - if do_preflight_checks && (*p).packet.can_recv() { - return (*p).id; - } - } - assert!(amt > 0); - - let mut ready_index = amt; - let mut ready_id = uint::MAX; - let mut iter = self.iter().enumerate(); - - // Acquire a number of blocking contexts, and block on each one - // sequentially until one fails. If one fails, then abort - // immediately so we can go unblock on all the other receivers. - let task: Box<Task> = Local::take(); - task.deschedule(amt, |task| { - // Prepare for the block - let (i, handle) = iter.next().unwrap(); - match (*handle).packet.start_selection(task) { - Ok(()) => Ok(()), - Err(task) => { - ready_index = i; - ready_id = (*handle).id; - Err(task) - } - } - }); - - // Abort the selection process on each receiver. If the abort - // process returns `true`, then that means that the receiver is - // ready to receive some data. Note that this also means that the - // receiver may have yet to have fully read the `to_wake` field and - // woken us up (although the wakeup is guaranteed to fail). - // - // This situation happens in the window of where a sender invokes - // increment(), sees -1, and then decides to wake up the task. After - // all this is done, the sending thread will set `selecting` to - // `false`. Until this is done, we cannot return. If we were to - // return, then a sender could wake up a receiver which has gone - // back to sleep after this call to `select`. - // - // Note that it is a "fairly small window" in which an increment() - // views that it should wake a thread up until the `selecting` bit - // is set to false. For now, the implementation currently just spins - // in a yield loop. This is very distasteful, but this - // implementation is already nowhere near what it should ideally be. - // A rewrite should focus on avoiding a yield loop, and for now this - // implementation is tying us over to a more efficient "don't - // iterate over everything every time" implementation. - for handle in self.iter().take(ready_index) { - if (*handle).packet.abort_selection() { - ready_id = (*handle).id; - } - } - - assert!(ready_id != uint::MAX); - return ready_id; - } - } - - fn iter(&self) -> Packets { Packets { cur: self.head } } -} - -impl<'rx, T: Send> Handle<'rx, T> { - /// Retrieve the id of this handle. - #[inline] - pub fn id(&self) -> uint { self.id } - - /// Receive a value on the underlying receiver. Has the same semantics as - /// `Receiver.recv` - pub fn recv(&mut self) -> T { self.rx.recv() } - /// Block to receive a value on the underlying receiver, returning `Some` on - /// success or `None` if the channel disconnects. This function has the same - /// semantics as `Receiver.recv_opt` - pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() } - - /// Adds this handle to the receiver set that the handle was created from. This - /// method can be called multiple times, but it has no effect if `add` was - /// called previously. - /// - /// This method is unsafe because it requires that the `Handle` is not moved - /// while it is added to the `Select` set. - pub unsafe fn add(&mut self) { - if self.added { return } - let selector: &mut Select = mem::transmute(&*self.selector); - let me: *mut Handle<'static, ()> = mem::transmute(&*self); - - if selector.head.is_null() { - selector.head = me; - selector.tail = me; - } else { - (*me).prev = selector.tail; - assert!((*me).next.is_null()); - (*selector.tail).next = me; - selector.tail = me; - } - self.added = true; - } - - /// Removes this handle from the `Select` set. This method is unsafe because - /// it has no guarantee that the `Handle` was not moved since `add` was - /// called. - pub unsafe fn remove(&mut self) { - if !self.added { return } - - let selector: &mut Select = mem::transmute(&*self.selector); - let me: *mut Handle<'static, ()> = mem::transmute(&*self); - - if self.prev.is_null() { - assert_eq!(selector.head, me); - selector.head = self.next; - } else { - (*self.prev).next = self.next; - } - if self.next.is_null() { - assert_eq!(selector.tail, me); - selector.tail = self.prev; - } else { - (*self.next).prev = self.prev; - } - - self.next = 0 as *mut Handle<'static, ()>; - self.prev = 0 as *mut Handle<'static, ()>; - - self.added = false; - } -} - -#[unsafe_destructor] -impl Drop for Select { - fn drop(&mut self) { - assert!(self.head.is_null()); - assert!(self.tail.is_null()); - } -} - -#[unsafe_destructor] -impl<'rx, T: Send> Drop for Handle<'rx, T> { - fn drop(&mut self) { - unsafe { self.remove() } - } -} - -impl Iterator<*mut Handle<'static, ()>> for Packets { - fn next(&mut self) -> Option<*mut Handle<'static, ()>> { - if self.cur.is_null() { - None - } else { - let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).next; } - ret - } - } -} - -#[cfg(test)] -#[allow(unused_imports)] -mod test { - use super::super::*; - use prelude::*; - - test!(fn smoke() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - tx1.send(1); - select! ( - foo = rx1.recv() => { assert_eq!(foo, 1); }, - _bar = rx2.recv() => { fail!() } - ) - tx2.send(2); - select! ( - _foo = rx1.recv() => { fail!() }, - bar = rx2.recv() => { assert_eq!(bar, 2) } - ) - drop(tx1); - select! ( - foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, - _bar = rx2.recv() => { fail!() } - ) - drop(tx2); - select! ( - bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } - ) - }) - - test!(fn smoke2() { - let (_tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (_tx3, rx3) = channel::<int>(); - let (_tx4, rx4) = channel::<int>(); - let (tx5, rx5) = channel::<int>(); - tx5.send(4); - select! ( - _foo = rx1.recv() => { fail!("1") }, - _foo = rx2.recv() => { fail!("2") }, - _foo = rx3.recv() => { fail!("3") }, - _foo = rx4.recv() => { fail!("4") }, - foo = rx5.recv() => { assert_eq!(foo, 4); } - ) - }) - - test!(fn closed() { - let (_tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - drop(tx2); - - select! ( - _a1 = rx1.recv_opt() => { fail!() }, - a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } - ) - }) - - test!(fn unblocks() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<int>(); - - spawn(proc() { - for _ in range(0, 20) { task::deschedule(); } - tx1.send(1); - rx3.recv(); - for _ in range(0, 20) { task::deschedule(); } - }); - - select! ( - a = rx1.recv() => { assert_eq!(a, 1); }, - _b = rx2.recv() => { fail!() } - ) - tx3.send(1); - select! ( - a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, - _b = rx2.recv() => { fail!() } - ) - }) - - test!(fn both_ready() { - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - spawn(proc() { - for _ in range(0, 20) { task::deschedule(); } - tx1.send(1); - tx2.send(2); - rx3.recv(); - }); - - select! ( - a = rx1.recv() => { assert_eq!(a, 1); }, - a = rx2.recv() => { assert_eq!(a, 2); } - ) - select! ( - a = rx1.recv() => { assert_eq!(a, 1); }, - a = rx2.recv() => { assert_eq!(a, 2); } - ) - assert_eq!(rx1.try_recv(), Err(Empty)); - assert_eq!(rx2.try_recv(), Err(Empty)); - tx3.send(()); - }) - - test!(fn stress() { - static AMT: int = 10000; - let (tx1, rx1) = channel::<int>(); - let (tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - spawn(proc() { - for i in range(0, AMT) { - if i % 2 == 0 { - tx1.send(i); - } else { - tx2.send(i); - } - rx3.recv(); - } - }); - - for i in range(0, AMT) { - select! ( - i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); }, - i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); } - ) - tx3.send(()); - } - }) - - test!(fn cloning() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - spawn(proc() { - rx3.recv(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(Empty)); - tx1.send(2); - rx3.recv(); - }); - - tx3.send(()); - select!( - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => fail!() - ) - tx3.send(()); - }) - - test!(fn cloning2() { - let (tx1, rx1) = channel::<int>(); - let (_tx2, rx2) = channel::<int>(); - let (tx3, rx3) = channel::<()>(); - - spawn(proc() { - rx3.recv(); - tx1.clone(); - assert_eq!(rx3.try_recv(), Err(Empty)); - tx1.send(2); - rx3.recv(); - }); - - tx3.send(()); - select!( - _i1 = rx1.recv() => {}, - _i2 = rx2.recv() => fail!() - ) - tx3.send(()); - }) - - test!(fn cloning3() { - let (tx1, rx1) = channel::<()>(); - let (tx2, rx2) = channel::<()>(); - let (tx3, rx3) = channel::<()>(); - spawn(proc() { - let s = Select::new(); - let mut h1 = s.handle(&rx1); - let mut h2 = s.handle(&rx2); - unsafe { h2.add(); } - unsafe { h1.add(); } - assert_eq!(s.wait(), h2.id); - tx3.send(()); - }); - - for _ in range(0, 1000) { task::deschedule(); } - drop(tx1.clone()); - tx2.send(()); - rx3.recv(); - }) - - test!(fn preflight1() { - let (tx, rx) = channel(); - tx.send(()); - select!( - () = rx.recv() => {} - ) - }) - - test!(fn preflight2() { - let (tx, rx) = channel(); - tx.send(()); - tx.send(()); - select!( - () = rx.recv() => {} - ) - }) - - test!(fn preflight3() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - select!( - () = rx.recv() => {} - ) - }) - - test!(fn preflight4() { - let (tx, rx) = channel(); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn preflight5() { - let (tx, rx) = channel(); - tx.send(()); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn preflight6() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn preflight7() { - let (tx, rx) = channel::<()>(); - drop(tx); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn preflight8() { - let (tx, rx) = channel(); - tx.send(()); - drop(tx); - rx.recv(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn preflight9() { - let (tx, rx) = channel(); - drop(tx.clone()); - tx.send(()); - drop(tx); - rx.recv(); - let s = Select::new(); - let mut h = s.handle(&rx); - unsafe { h.add(); } - assert_eq!(s.wait2(false), h.id); - }) - - test!(fn oneshot_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - spawn(proc() { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0, 100) { task::deschedule() } - tx1.send(()); - rx2.recv(); - }) - - test!(fn stream_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - tx1.send(()); - tx1.send(()); - rx1.recv(); - rx1.recv(); - spawn(proc() { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0, 100) { task::deschedule() } - tx1.send(()); - rx2.recv(); - }) - - test!(fn shared_data_waiting() { - let (tx1, rx1) = channel(); - let (tx2, rx2) = channel(); - drop(tx1.clone()); - tx1.send(()); - rx1.recv(); - spawn(proc() { - select! { - () = rx1.recv() => {} - } - tx2.send(()); - }); - - for _ in range(0, 100) { task::deschedule() } - tx1.send(()); - rx2.recv(); - }) - - test!(fn sync1() { - let (tx, rx) = sync_channel(1); - tx.send(1); - select! { - n = rx.recv() => { assert_eq!(n, 1); } - } - }) - - test!(fn sync2() { - let (tx, rx) = sync_channel(0); - spawn(proc() { - for _ in range(0, 100) { task::deschedule() } - tx.send(1); - }); - select! { - n = rx.recv() => { assert_eq!(n, 1); } - } - }) - - test!(fn sync3() { - let (tx1, rx1) = sync_channel(0); - let (tx2, rx2) = channel(); - spawn(proc() { tx1.send(1); }); - spawn(proc() { tx2.send(2); }); - select! { - n = rx1.recv() => { - assert_eq!(n, 1); - assert_eq!(rx2.recv(), 2); - }, - n = rx2.recv() => { - assert_eq!(n, 2); - assert_eq!(rx1.recv(), 1); - } - } - }) -} diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs deleted file mode 100644 index f4eeebeeea0..00000000000 --- a/src/libstd/comm/shared.rs +++ /dev/null @@ -1,505 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Shared channels -/// -/// This is the flavor of channels which are not necessarily optimized for any -/// particular use case, but are the most general in how they are used. Shared -/// channels are cloneable allowing for multiple senders. -/// -/// High level implementation details can be found in the comment of the parent -/// module. You'll also note that the implementation of the shared and stream -/// channels are quite similar, and this is no coincidence! - -use cmp; -use int; -use iter::Iterator; -use kinds::Send; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use result::{Ok, Err, Result}; -use rt::local::Local; -use rt::mutex::NativeMutex; -use rt::task::{Task, BlockedTask}; -use rt::thread::Thread; -use sync::atomics; - -use mpsc = sync::mpsc_queue; - -static DISCONNECTED: int = int::MIN; -static FUDGE: int = 1024; -#[cfg(test)] -static MAX_STEALS: int = 5; -#[cfg(not(test))] -static MAX_STEALS: int = 1 << 20; - -pub struct Packet<T> { - queue: mpsc::Queue<T>, - cnt: atomics::AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: atomics::AtomicUint, // Task to wake up - - // The number of channels which are currently using this packet. - channels: atomics::AtomicInt, - - // See the discussion in Port::drop and the channel send methods for what - // these are used for - port_dropped: atomics::AtomicBool, - sender_drain: atomics::AtomicInt, - - // this lock protects various portions of this implementation during - // select() - select_lock: NativeMutex, -} - -pub enum Failure { - Empty, - Disconnected, -} - -impl<T: Send> Packet<T> { - // Creation of a packet *must* be followed by a call to postinit_lock - // and later by inherit_blocker - pub fn new() -> Packet<T> { - let p = Packet { - queue: mpsc::Queue::new(), - cnt: atomics::AtomicInt::new(0), - steals: 0, - to_wake: atomics::AtomicUint::new(0), - channels: atomics::AtomicInt::new(2), - port_dropped: atomics::AtomicBool::new(false), - sender_drain: atomics::AtomicInt::new(0), - select_lock: unsafe { NativeMutex::new() }, - }; - return p; - } - - // This function should be used after newly created Packet - // was wrapped with an Arc - // In other case mutex data will be duplicated while clonning - // and that could cause problems on platforms where it is - // represented by opaque data structure - pub fn postinit_lock(&mut self) { - unsafe { self.select_lock.lock_noguard() } - } - - // This function is used at the creation of a shared packet to inherit a - // previously blocked task. This is done to prevent spurious wakeups of - // tasks in select(). - // - // This can only be called at channel-creation time - pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) { - match task { - Some(task) => { - assert_eq!(self.cnt.load(atomics::SeqCst), 0); - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - self.to_wake.store(unsafe { task.cast_to_uint() }, - atomics::SeqCst); - self.cnt.store(-1, atomics::SeqCst); - - // This store is a little sketchy. What's happening here is - // that we're transferring a blocker from a oneshot or stream - // channel to this shared channel. In doing so, we never - // spuriously wake them up and rather only wake them up at the - // appropriate time. This implementation of shared channels - // assumes that any blocking recv() will undo the increment of - // steals performed in try_recv() once the recv is complete. - // This thread that we're inheriting, however, is not in the - // middle of recv. Hence, the first time we wake them up, - // they're going to wake up from their old port, move on to the - // upgraded port, and then call the block recv() function. - // - // When calling this function, they'll find there's data - // immediately available, counting it as a steal. This in fact - // wasn't a steal because we appropriately blocked them waiting - // for data. - // - // To offset this bad increment, we initially set the steal - // count to -1. You'll find some special code in - // abort_selection() as well to ensure that this -1 steal count - // doesn't escape too far. - self.steals = -1; - } - None => {} - } - - // When the shared packet is constructed, we grabbed this lock. The - // purpose of this lock is to ensure that abort_selection() doesn't - // interfere with this method. After we unlock this lock, we're - // signifying that we're done modifying self.cnt and self.to_wake and - // the port is ready for the world to continue using it. - unsafe { self.select_lock.unlock_noguard() } - } - - pub fn send(&mut self, t: T) -> Result<(), T> { - // See Port::drop for what's going on - if self.port_dropped.load(atomics::SeqCst) { return Err(t) } - - // Note that the multiple sender case is a little tricker - // semantically than the single sender case. The logic for - // incrementing is "add and if disconnected store disconnected". - // This could end up leading some senders to believe that there - // wasn't a disconnect if in fact there was a disconnect. This means - // that while one thread is attempting to re-store the disconnected - // states, other threads could walk through merrily incrementing - // this very-negative disconnected count. To prevent senders from - // spuriously attempting to send when the channels is actually - // disconnected, the count has a ranged check here. - // - // This is also done for another reason. Remember that the return - // value of this function is: - // - // `true` == the data *may* be received, this essentially has no - // meaning - // `false` == the data will *never* be received, this has a lot of - // meaning - // - // In the SPSC case, we have a check of 'queue.is_empty()' to see - // whether the data was actually received, but this same condition - // means nothing in a multi-producer context. As a result, this - // preflight check serves as the definitive "this will never be - // received". Once we get beyond this check, we have permanently - // entered the realm of "this may be received" - if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE { - return Err(t) - } - - self.queue.push(t); - match self.cnt.fetch_add(1, atomics::SeqCst) { - -1 => { - self.take_to_wake().wake().map(|t| t.reawaken()); - } - - // In this case, we have possibly failed to send our data, and - // we need to consider re-popping the data in order to fully - // destroy it. We must arbitrate among the multiple senders, - // however, because the queues that we're using are - // single-consumer queues. In order to do this, all exiting - // pushers will use an atomic count in order to count those - // flowing through. Pushers who see 0 are required to drain as - // much as possible, and then can only exit when they are the - // only pusher (otherwise they must try again). - n if n < DISCONNECTED + FUDGE => { - // see the comment in 'try' for a shared channel for why this - // window of "not disconnected" is ok. - self.cnt.store(DISCONNECTED, atomics::SeqCst); - - if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 { - loop { - // drain the queue, for info on the thread yield see the - // discussion in try_recv - loop { - match self.queue.pop() { - mpsc::Data(..) => {} - mpsc::Empty => break, - mpsc::Inconsistent => Thread::yield_now(), - } - } - // maybe we're done, if we're not the last ones - // here, then we need to go try again. - if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 { - break - } - } - - // At this point, there may still be data on the queue, - // but only if the count hasn't been incremented and - // some other sender hasn't finished pushing data just - // yet. That sender in question will drain its own data. - } - } - - // Can't make any assumptions about this case like in the SPSC case. - _ => {} - } - - Ok(()) - } - - pub fn recv(&mut self) -> Result<T, Failure> { - // This code is essentially the exact same as that found in the stream - // case (see stream.rs) - match self.try_recv() { - Err(Empty) => {} - data => return data, - } - - let task: Box<Task> = Local::take(); - task.deschedule(1, |task| { - self.decrement(task) - }); - - match self.try_recv() { - data @ Ok(..) => { self.steals -= 1; data } - data => data, - } - } - - // Essentially the exact same thing as the stream decrement function. - fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - let n = unsafe { task.cast_to_uint() }; - self.to_wake.store(n, atomics::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Ok(()) } - } - } - - self.to_wake.store(0, atomics::SeqCst); - Err(unsafe { BlockedTask::cast_from_uint(n) }) - } - - pub fn try_recv(&mut self) -> Result<T, Failure> { - let ret = match self.queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - - // This is a bit of an interesting case. The channel is - // reported as having data available, but our pop() has - // failed due to the queue being in an inconsistent state. - // This means that there is some pusher somewhere which has - // yet to complete, but we are guaranteed that a pop will - // eventually succeed. In this case, we spin in a yield loop - // because the remote sender should finish their enqueue - // operation "very quickly". - // - // Note that this yield loop does *not* attempt to do a green - // yield (regardless of the context), but *always* performs an - // OS-thread yield. The reasoning for this is that the pusher in - // question which is causing the inconsistent state is - // guaranteed to *not* be a blocked task (green tasks can't get - // pre-empted), so it must be on a different OS thread. Also, - // `try_recv` is normally a "guaranteed no rescheduling" context - // in a green-thread situation. By yielding control of the - // thread, we will hopefully allow time for the remote task on - // the other OS thread to make progress. - // - // Avoiding this yield loop would require a different queue - // abstraction which provides the guarantee that after M - // pushes have succeeded, at least M pops will succeed. The - // current queues guarantee that if there are N active - // pushes, you can pop N times once all N have finished. - mpsc::Inconsistent => { - let data; - loop { - Thread::yield_now(); - match self.queue.pop() { - mpsc::Data(t) => { data = t; break } - mpsc::Empty => fail!("inconsistent => empty"), - mpsc::Inconsistent => {} - } - } - Some(data) - } - }; - match ret { - // See the discussion in the stream implementation for why we - // might decrement steals. - Some(data) => { - if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomics::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomics::SeqCst); - } - n => { - let m = cmp::min(n, self.steals); - self.steals -= m; - self.bump(n - m); - } - } - assert!(self.steals >= 0); - } - self.steals += 1; - Ok(data) - } - - // See the discussion in the stream implementation for why we try - // again. - None => { - match self.cnt.load(atomics::SeqCst) { - n if n != DISCONNECTED => Err(Empty), - _ => { - match self.queue.pop() { - mpsc::Data(t) => Ok(t), - mpsc::Empty => Err(Disconnected), - // with no senders, an inconsistency is impossible. - mpsc::Inconsistent => unreachable!(), - } - } - } - } - } - } - - // Prepares this shared packet for a channel clone, essentially just bumping - // a refcount. - pub fn clone_chan(&mut self) { - self.channels.fetch_add(1, atomics::SeqCst); - } - - // Decrement the reference count on a channel. This is called whenever a - // Chan is dropped and may end up waking up a receiver. It's the receiver's - // responsibility on the other end to figure out that we've disconnected. - pub fn drop_chan(&mut self) { - match self.channels.fetch_sub(1, atomics::SeqCst) { - 1 => {} - n if n > 1 => return, - n => fail!("bad number of channels left {}", n), - } - - match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { - -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - - // See the long discussion inside of stream.rs for why the queue is drained, - // and why it is done in this fashion. - pub fn drop_port(&mut self) { - self.port_dropped.store(true, atomics::SeqCst); - let mut steals = self.steals; - while { - let cnt = self.cnt.compare_and_swap( - steals, DISCONNECTED, atomics::SeqCst); - cnt != DISCONNECTED && cnt != steals - } { - // See the discussion in 'try_recv' for why we yield - // control of this thread. - loop { - match self.queue.pop() { - mpsc::Data(..) => { steals += 1; } - mpsc::Empty | mpsc::Inconsistent => break, - } - } - } - } - - // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> BlockedTask { - let task = self.to_wake.load(atomics::SeqCst); - self.to_wake.store(0, atomics::SeqCst); - assert!(task != 0); - unsafe { BlockedTask::cast_from_uint(task) } - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // Helper function for select, tests whether this port can receive without - // blocking (obviously not an atomic decision). - // - // This is different than the stream version because there's no need to peek - // at the queue, we can just look at the local count. - pub fn can_recv(&mut self) -> bool { - let cnt = self.cnt.load(atomics::SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 - } - - // increment the count on the channel (used for selection) - fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomics::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomics::SeqCst); - DISCONNECTED - } - n => n - } - } - - // Inserts the blocked task for selection on this port, returning it back if - // the port already has data on it. - // - // The code here is the same as in stream.rs, except that it doesn't need to - // peek at the channel to see if an upgrade is pending. - pub fn start_selection(&mut self, - task: BlockedTask) -> Result<(), BlockedTask> { - match self.decrement(task) { - Ok(()) => Ok(()), - Err(task) => { - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - return Err(task); - } - } - } - - // Cancels a previous task waiting on this port, returning whether there's - // data on the port. - // - // This is similar to the stream implementation (hence fewer comments), but - // uses a different value for the "steals" variable. - pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { - // Before we do anything else, we bounce on this lock. The reason for - // doing this is to ensure that any upgrade-in-progress is gone and - // done with. Without this bounce, we can race with inherit_blocker - // about looking at and dealing with to_wake. Once we have acquired the - // lock, we are guaranteed that inherit_blocker is done. - unsafe { - let _guard = self.select_lock.lock(); - } - - // Like the stream implementation, we want to make sure that the count - // on the channel goes non-negative. We don't know how negative the - // stream currently is, so instead of using a steal value of 1, we load - // the channel count and figure out what we should do to make it - // positive. - let steals = { - let cnt = self.cnt.load(atomics::SeqCst); - if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} - }; - let prev = self.bump(steals + 1); - - if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - true - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - if prev < 0 { - self.take_to_wake().trash(); - } else { - while self.to_wake.load(atomics::SeqCst) != 0 { - Thread::yield_now(); - } - } - // if the number of steals is -1, it was the pre-emptive -1 steal - // count from when we inherited a blocker. This is fine because - // we're just going to overwrite it with a real value. - assert!(self.steals == 0 || self.steals == -1); - self.steals = steals; - prev >= 0 - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - assert_eq!(self.channels.load(atomics::SeqCst), 0); - } -} diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs deleted file mode 100644 index 9fb22ef4508..00000000000 --- a/src/libstd/comm/stream.rs +++ /dev/null @@ -1,483 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Stream channels -/// -/// This is the flavor of channels which are optimized for one sender and one -/// receiver. The sender will be upgraded to a shared channel if the channel is -/// cloned. -/// -/// High level implementation details can be found in the comment of the parent -/// module. - -use cmp; -use comm::Receiver; -use int; -use iter::Iterator; -use kinds::Send; -use ops::Drop; -use option::{Some, None}; -use owned::Box; -use result::{Ok, Err, Result}; -use rt::local::Local; -use rt::task::{Task, BlockedTask}; -use rt::thread::Thread; -use spsc = sync::spsc_queue; -use sync::atomics; - -static DISCONNECTED: int = int::MIN; -#[cfg(test)] -static MAX_STEALS: int = 5; -#[cfg(not(test))] -static MAX_STEALS: int = 1 << 20; - -pub struct Packet<T> { - queue: spsc::Queue<Message<T>>, // internal queue for all message - - cnt: atomics::AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: atomics::AtomicUint, // Task to wake up - - port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed. -} - -pub enum Failure<T> { - Empty, - Disconnected, - Upgraded(Receiver<T>), -} - -pub enum UpgradeResult { - UpSuccess, - UpDisconnected, - UpWoke(BlockedTask), -} - -pub enum SelectionResult<T> { - SelSuccess, - SelCanceled(BlockedTask), - SelUpgraded(BlockedTask, Receiver<T>), -} - -// Any message could contain an "upgrade request" to a new shared port, so the -// internal queue it's a queue of T, but rather Message<T> -enum Message<T> { - Data(T), - GoUp(Receiver<T>), -} - -impl<T: Send> Packet<T> { - pub fn new() -> Packet<T> { - Packet { - queue: spsc::Queue::new(128), - - cnt: atomics::AtomicInt::new(0), - steals: 0, - to_wake: atomics::AtomicUint::new(0), - - port_dropped: atomics::AtomicBool::new(false), - } - } - - - pub fn send(&mut self, t: T) -> Result<(), T> { - // If the other port has deterministically gone away, then definitely - // must return the data back up the stack. Otherwise, the data is - // considered as being sent. - if self.port_dropped.load(atomics::SeqCst) { return Err(t) } - - match self.do_send(Data(t)) { - UpSuccess | UpDisconnected => {}, - UpWoke(task) => { task.wake().map(|t| t.reawaken()); } - } - Ok(()) - } - pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { - // If the port has gone away, then there's no need to proceed any - // further. - if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } - - self.do_send(GoUp(up)) - } - - fn do_send(&mut self, t: Message<T>) -> UpgradeResult { - self.queue.push(t); - match self.cnt.fetch_add(1, atomics::SeqCst) { - // As described in the mod's doc comment, -1 == wakeup - -1 => UpWoke(self.take_to_wake()), - // As as described before, SPSC queues must be >= -2 - -2 => UpSuccess, - - // Be sure to preserve the disconnected state, and the return value - // in this case is going to be whether our data was received or not. - // This manifests itself on whether we have an empty queue or not. - // - // Primarily, are required to drain the queue here because the port - // will never remove this data. We can only have at most one item to - // drain (the port drains the rest). - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomics::SeqCst); - let first = self.queue.pop(); - let second = self.queue.pop(); - assert!(second.is_none()); - - match first { - Some(..) => UpSuccess, // we failed to send the data - None => UpDisconnected, // we successfully sent data - } - } - - // Otherwise we just sent some data on a non-waiting queue, so just - // make sure the world is sane and carry on! - n => { assert!(n >= 0); UpSuccess } - } - } - - // Consumes ownership of the 'to_wake' field. - fn take_to_wake(&mut self) -> BlockedTask { - let task = self.to_wake.load(atomics::SeqCst); - self.to_wake.store(0, atomics::SeqCst); - assert!(task != 0); - unsafe { BlockedTask::cast_from_uint(task) } - } - - // Decrements the count on the channel for a sleeper, returning the sleeper - // back if it shouldn't sleep. Note that this is the location where we take - // steals into account. - fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - let n = unsafe { task.cast_to_uint() }; - self.to_wake.store(n, atomics::SeqCst); - - let steals = self.steals; - self.steals = 0; - - match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } - // If we factor in our steals and notice that the channel has no - // data, we successfully sleep - n => { - assert!(n >= 0); - if n - steals <= 0 { return Ok(()) } - } - } - - self.to_wake.store(0, atomics::SeqCst); - Err(unsafe { BlockedTask::cast_from_uint(n) }) - } - - pub fn recv(&mut self) -> Result<T, Failure<T>> { - // Optimistic preflight check (scheduling is expensive). - match self.try_recv() { - Err(Empty) => {} - data => return data, - } - - // Welp, our channel has no data. Deschedule the current task and - // initiate the blocking protocol. - let task: Box<Task> = Local::take(); - task.deschedule(1, |task| { - self.decrement(task) - }); - - match self.try_recv() { - // Messages which actually popped from the queue shouldn't count as - // a steal, so offset the decrement here (we already have our - // "steal" factored into the channel count above). - data @ Ok(..) | - data @ Err(Upgraded(..)) => { - self.steals -= 1; - data - } - - data => data, - } - } - - pub fn try_recv(&mut self) -> Result<T, Failure<T>> { - match self.queue.pop() { - // If we stole some data, record to that effect (this will be - // factored into cnt later on). - // - // Note that we don't allow steals to grow without bound in order to - // prevent eventual overflow of either steals or cnt as an overflow - // would have catastrophic results. Sometimes, steals > cnt, but - // other times cnt > steals, so we don't know the relation between - // steals and cnt. This code path is executed only rarely, so we do - // a pretty slow operation, of swapping 0 into cnt, taking steals - // down as much as possible (without going negative), and then - // adding back in whatever we couldn't factor into steals. - Some(data) => { - if self.steals > MAX_STEALS { - match self.cnt.swap(0, atomics::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomics::SeqCst); - } - n => { - let m = cmp::min(n, self.steals); - self.steals -= m; - self.bump(n - m); - } - } - assert!(self.steals >= 0); - } - self.steals += 1; - match data { - Data(t) => Ok(t), - GoUp(up) => Err(Upgraded(up)), - } - } - - None => { - match self.cnt.load(atomics::SeqCst) { - n if n != DISCONNECTED => Err(Empty), - - // This is a little bit of a tricky case. We failed to pop - // data above, and then we have viewed that the channel is - // disconnected. In this window more data could have been - // sent on the channel. It doesn't really make sense to - // return that the channel is disconnected when there's - // actually data on it, so be extra sure there's no data by - // popping one more time. - // - // We can ignore steals because the other end is - // disconnected and we'll never need to really factor in our - // steals again. - _ => { - match self.queue.pop() { - Some(Data(t)) => Ok(t), - Some(GoUp(up)) => Err(Upgraded(up)), - None => Err(Disconnected), - } - } - } - } - } - } - - pub fn drop_chan(&mut self) { - // Dropping a channel is pretty simple, we just flag it as disconnected - // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { - -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - - pub fn drop_port(&mut self) { - // Dropping a port seems like a fairly trivial thing. In theory all we - // need to do is flag that we're disconnected and then everything else - // can take over (we don't have anyone to wake up). - // - // The catch for Ports is that we want to drop the entire contents of - // the queue. There are multiple reasons for having this property, the - // largest of which is that if another chan is waiting in this channel - // (but not received yet), then waiting on that port will cause a - // deadlock. - // - // So if we accept that we must now destroy the entire contents of the - // queue, this code may make a bit more sense. The tricky part is that - // we can't let any in-flight sends go un-dropped, we have to make sure - // *everything* is dropped and nothing new will come onto the channel. - - // The first thing we do is set a flag saying that we're done for. All - // sends are gated on this flag, so we're immediately guaranteed that - // there are a bounded number of active sends that we'll have to deal - // with. - self.port_dropped.store(true, atomics::SeqCst); - - // Now that we're guaranteed to deal with a bounded number of senders, - // we need to drain the queue. This draining process happens atomically - // with respect to the "count" of the channel. If the count is nonzero - // (with steals taken into account), then there must be data on the - // channel. In this case we drain everything and then try again. We will - // continue to fail while active senders send data while we're dropping - // data, but eventually we're guaranteed to break out of this loop - // (because there is a bounded number of senders). - let mut steals = self.steals; - while { - let cnt = self.cnt.compare_and_swap( - steals, DISCONNECTED, atomics::SeqCst); - cnt != DISCONNECTED && cnt != steals - } { - loop { - match self.queue.pop() { - Some(..) => { steals += 1; } - None => break - } - } - } - - // At this point in time, we have gated all future senders from sending, - // and we have flagged the channel as being disconnected. The senders - // still have some responsibility, however, because some sends may not - // complete until after we flag the disconnection. There are more - // details in the sending methods that see DISCONNECTED - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // Tests to see whether this port can receive without blocking. If Ok is - // returned, then that's the answer. If Err is returned, then the returned - // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { - // We peek at the queue to see if there's anything on it, and we use - // this return value to determine if we should pop from the queue and - // upgrade this channel immediately. If it looks like we've got an - // upgrade pending, then go through the whole recv rigamarole to update - // the internal state. - match self.queue.peek() { - Some(&GoUp(..)) => { - match self.recv() { - Err(Upgraded(port)) => Err(port), - _ => unreachable!(), - } - } - Some(..) => Ok(true), - None => Ok(false) - } - } - - // increment the count on the channel (used for selection) - fn bump(&mut self, amt: int) -> int { - match self.cnt.fetch_add(amt, atomics::SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, atomics::SeqCst); - DISCONNECTED - } - n => n - } - } - - // Attempts to start selecting on this port. Like a oneshot, this can fail - // immediately because of an upgrade. - pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> { - match self.decrement(task) { - Ok(()) => SelSuccess, - Err(task) => { - let ret = match self.queue.peek() { - Some(&GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => SelUpgraded(task, port), - _ => unreachable!(), - } - } - Some(..) => SelCanceled(task), - None => SelCanceled(task), - }; - // Undo our decrement above, and we should be guaranteed that the - // previous value is positive because we're not going to sleep - let prev = self.bump(1); - assert!(prev == DISCONNECTED || prev >= 0); - return ret; - } - } - } - - // Removes a previous task from being blocked in this port - pub fn abort_selection(&mut self, - was_upgrade: bool) -> Result<bool, Receiver<T>> { - // If we're aborting selection after upgrading from a oneshot, then - // we're guarantee that no one is waiting. The only way that we could - // have seen the upgrade is if data was actually sent on the channel - // half again. For us, this means that there is guaranteed to be data on - // this channel. Furthermore, we're guaranteed that there was no - // start_selection previously, so there's no need to modify `self.cnt` - // at all. - // - // Hence, because of these invariants, we immediately return `Ok(true)`. - // Note that the data may not actually be sent on the channel just yet. - // The other end could have flagged the upgrade but not sent data to - // this end. This is fine because we know it's a small bounded windows - // of time until the data is actually sent. - if was_upgrade { - assert_eq!(self.steals, 0); - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - return Ok(true) - } - - // We want to make sure that the count on the channel goes non-negative, - // and in the stream case we can have at most one steal, so just assume - // that we had one steal. - let steals = 1; - let prev = self.bump(steals + 1); - - // If we were previously disconnected, then we know for sure that there - // is no task in to_wake, so just keep going - let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - true // there is data, that data is that we're disconnected - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - - // If the previous count was negative, then we just made things go - // positive, hence we passed the -1 boundary and we're responsible - // for removing the to_wake() field and trashing it. - // - // If the previous count was positive then we're in a tougher - // situation. A possible race is that a sender just incremented - // through -1 (meaning it's going to try to wake a task up), but it - // hasn't yet read the to_wake. In order to prevent a future recv() - // from waking up too early (this sender picking up the plastered - // over to_wake), we spin loop here waiting for to_wake to be 0. - // Note that this entire select() implementation needs an overhaul, - // and this is *not* the worst part of it, so this is not done as a - // final solution but rather out of necessity for now to get - // something working. - if prev < 0 { - self.take_to_wake().trash(); - } else { - while self.to_wake.load(atomics::SeqCst) != 0 { - Thread::yield_now(); - } - } - assert_eq!(self.steals, 0); - self.steals = steals; - - // if we were previously positive, then there's surely data to - // receive - prev >= 0 - }; - - // Now that we've determined that this queue "has data", we peek at the - // queue to see if the data is an upgrade or not. If it's an upgrade, - // then we need to destroy this port and abort selection on the - // upgraded port. - if has_data { - match self.queue.peek() { - Some(&GoUp(..)) => { - match self.queue.pop() { - Some(GoUp(port)) => Err(port), - _ => unreachable!(), - } - } - _ => Ok(true), - } - } else { - Ok(false) - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(atomics::SeqCst), 0); - } -} diff --git a/src/libstd/comm/sync.rs b/src/libstd/comm/sync.rs deleted file mode 100644 index 84ef6d0aa8f..00000000000 --- a/src/libstd/comm/sync.rs +++ /dev/null @@ -1,485 +0,0 @@ -// Copyright 2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/// Synchronous channels/ports -/// -/// This channel implementation differs significantly from the asynchronous -/// implementations found next to it (oneshot/stream/share). This is an -/// implementation of a synchronous, bounded buffer channel. -/// -/// Each channel is created with some amount of backing buffer, and sends will -/// *block* until buffer space becomes available. A buffer size of 0 is valid, -/// which means that every successful send is paired with a successful recv. -/// -/// This flavor of channels defines a new `send_opt` method for channels which -/// is the method by which a message is sent but the task does not fail if it -/// cannot be delivered. -/// -/// Another major difference is that send() will *always* return back the data -/// if it couldn't be sent. This is because it is deterministically known when -/// the data is received and when it is not received. -/// -/// Implementation-wise, it can all be summed up with "use a mutex plus some -/// logic". The mutex used here is an OS native mutex, meaning that no user code -/// is run inside of the mutex (to prevent context switching). This -/// implementation shares almost all code for the buffered and unbuffered cases -/// of a synchronous channel. There are a few branches for the unbuffered case, -/// but they're mostly just relevant to blocking senders. - -use collections::Collection; -use iter::Iterator; -use kinds::Send; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use ptr::RawPtr; -use result::{Result, Ok, Err}; -use rt::local::Local; -use rt::mutex::{NativeMutex, LockGuard}; -use rt::task::{Task, BlockedTask}; -use sync::atomics; -use ty::Unsafe; -use vec::Vec; - -pub struct Packet<T> { - /// Only field outside of the mutex. Just done for kicks, but mainly because - /// the other shared channel already had the code implemented - channels: atomics::AtomicUint, - - /// The state field is protected by this mutex - lock: NativeMutex, - state: Unsafe<State<T>>, -} - -struct State<T> { - disconnected: bool, // Is the channel disconnected yet? - queue: Queue, // queue of senders waiting to send data - blocker: Blocker, // currently blocked task on this channel - buf: Buffer<T>, // storage for buffered messages - cap: uint, // capacity of this channel - - /// A curious flag used to indicate whether a sender failed or succeeded in - /// blocking. This is used to transmit information back to the task that it - /// must dequeue its message from the buffer because it was not received. - /// This is only relevant in the 0-buffer case. This obviously cannot be - /// safely constructed, but it's guaranteed to always have a valid pointer - /// value. - canceled: Option<&'static mut bool>, -} - -/// Possible flavors of tasks who can be blocked on this channel. -enum Blocker { - BlockedSender(BlockedTask), - BlockedReceiver(BlockedTask), - NoneBlocked -} - -/// Simple queue for threading tasks together. Nodes are stack-allocated, so -/// this structure is not safe at all -struct Queue { - head: *mut Node, - tail: *mut Node, -} - -struct Node { - task: Option<BlockedTask>, - next: *mut Node, -} - -/// A simple ring-buffer -struct Buffer<T> { - buf: Vec<Option<T>>, - start: uint, - size: uint, -} - -#[deriving(Show)] -pub enum Failure { - Empty, - Disconnected, -} - -/// Atomically blocks the current task, placing it into `slot`, unlocking `lock` -/// in the meantime. This re-locks the mutex upon returning. -fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker, - lock: &NativeMutex) { - let me: Box<Task> = Local::take(); - me.deschedule(1, |task| { - match mem::replace(slot, f(task)) { - NoneBlocked => {} - _ => unreachable!(), - } - unsafe { lock.unlock_noguard(); } - Ok(()) - }); - unsafe { lock.lock_noguard(); } -} - -/// Wakes up a task, dropping the lock at the correct time -fn wakeup(task: BlockedTask, guard: LockGuard) { - // We need to be careful to wake up the waiting task *outside* of the mutex - // in case it incurs a context switch. - mem::drop(guard); - task.wake().map(|t| t.reawaken()); -} - -impl<T: Send> Packet<T> { - pub fn new(cap: uint) -> Packet<T> { - Packet { - channels: atomics::AtomicUint::new(1), - lock: unsafe { NativeMutex::new() }, - state: Unsafe::new(State { - disconnected: false, - blocker: NoneBlocked, - cap: cap, - canceled: None, - queue: Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, - }, - buf: Buffer { - buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None), - start: 0, - size: 0, - }, - }), - } - } - - // Locks this channel, returning a guard for the state and the mutable state - // itself. Care should be taken to ensure that the state does not escape the - // guard! - // - // Note that we're ok promoting an & reference to an &mut reference because - // the lock ensures that we're the only ones in the world with a pointer to - // the state. - fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) { - unsafe { - let guard = self.lock.lock(); - (guard, &mut *self.state.get()) - } - } - - pub fn send(&self, t: T) -> Result<(), T> { - let (guard, state) = self.lock(); - - // wait for a slot to become available, and enqueue the data - while !state.disconnected && state.buf.size() == state.buf.cap() { - state.queue.enqueue(&self.lock); - } - if state.disconnected { return Err(t) } - state.buf.enqueue(t); - - match mem::replace(&mut state.blocker, NoneBlocked) { - // if our capacity is 0, then we need to wait for a receiver to be - // available to take our data. After waiting, we check again to make - // sure the port didn't go away in the meantime. If it did, we need - // to hand back our data. - NoneBlocked if state.cap == 0 => { - let mut canceled = false; - assert!(state.canceled.is_none()); - state.canceled = Some(unsafe { mem::transmute(&mut canceled) }); - wait(&mut state.blocker, BlockedSender, &self.lock); - if canceled {Err(state.buf.dequeue())} else {Ok(())} - } - - // success, we buffered some data - NoneBlocked => Ok(()), - - // success, someone's about to receive our buffered data. - BlockedReceiver(task) => { wakeup(task, guard); Ok(()) } - - BlockedSender(..) => fail!("lolwut"), - } - } - - pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> { - let (guard, state) = self.lock(); - if state.disconnected { - Err(super::RecvDisconnected(t)) - } else if state.buf.size() == state.buf.cap() { - Err(super::Full(t)) - } else if state.cap == 0 { - // With capacity 0, even though we have buffer space we can't - // transfer the data unless there's a receiver waiting. - match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => Err(super::Full(t)), - BlockedSender(..) => unreachable!(), - BlockedReceiver(task) => { - state.buf.enqueue(t); - wakeup(task, guard); - Ok(()) - } - } - } else { - // If the buffer has some space and the capacity isn't 0, then we - // just enqueue the data for later retrieval. - assert!(state.buf.size() < state.buf.cap()); - state.buf.enqueue(t); - Ok(()) - } - } - - // Receives a message from this channel - // - // When reading this, remember that there can only ever be one receiver at - // time. - pub fn recv(&self) -> Result<T, ()> { - let (guard, state) = self.lock(); - - // Wait for the buffer to have something in it. No need for a while loop - // because we're the only receiver. - let mut waited = false; - if !state.disconnected && state.buf.size() == 0 { - wait(&mut state.blocker, BlockedReceiver, &self.lock); - waited = true; - } - if state.disconnected && state.buf.size() == 0 { return Err(()) } - - // Pick up the data, wake up our neighbors, and carry on - assert!(state.buf.size() > 0); - let ret = state.buf.dequeue(); - self.wakeup_senders(waited, guard, state); - return Ok(ret); - } - - pub fn try_recv(&self) -> Result<T, Failure> { - let (guard, state) = self.lock(); - - // Easy cases first - if state.disconnected { return Err(Disconnected) } - if state.buf.size() == 0 { return Err(Empty) } - - // Be sure to wake up neighbors - let ret = Ok(state.buf.dequeue()); - self.wakeup_senders(false, guard, state); - - return ret; - } - - // Wake up pending senders after some data has been received - // - // * `waited` - flag if the receiver blocked to receive some data, or if it - // just picked up some data on the way out - // * `guard` - the lock guard that is held over this channel's lock - fn wakeup_senders(&self, waited: bool, - guard: LockGuard, - state: &mut State<T>) { - let pending_sender1: Option<BlockedTask> = state.queue.dequeue(); - - // If this is a no-buffer channel (cap == 0), then if we didn't wait we - // need to ACK the sender. If we waited, then the sender waking us up - // was already the ACK. - let pending_sender2 = if state.cap == 0 && !waited { - match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => None, - BlockedReceiver(..) => unreachable!(), - BlockedSender(task) => { - state.canceled.take(); - Some(task) - } - } - } else { - None - }; - mem::drop((state, guard)); - - // only outside of the lock do we wake up the pending tasks - pending_sender1.map(|t| t.wake().map(|t| t.reawaken())); - pending_sender2.map(|t| t.wake().map(|t| t.reawaken())); - } - - // Prepares this shared packet for a channel clone, essentially just bumping - // a refcount. - pub fn clone_chan(&self) { - self.channels.fetch_add(1, atomics::SeqCst); - } - - pub fn drop_chan(&self) { - // Only flag the channel as disconnected if we're the last channel - match self.channels.fetch_sub(1, atomics::SeqCst) { - 1 => {} - _ => return - } - - // Not much to do other than wake up a receiver if one's there - let (guard, state) = self.lock(); - if state.disconnected { return } - state.disconnected = true; - match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - BlockedReceiver(task) => wakeup(task, guard), - } - } - - pub fn drop_port(&self) { - let (guard, state) = self.lock(); - - if state.disconnected { return } - state.disconnected = true; - - // If the capacity is 0, then the sender may want its data back after - // we're disconnected. Otherwise it's now our responsibility to destroy - // the buffered data. As with many other portions of this code, this - // needs to be careful to destroy the data *outside* of the lock to - // prevent deadlock. - let _data = if state.cap != 0 { - mem::replace(&mut state.buf.buf, Vec::new()) - } else { - Vec::new() - }; - let mut queue = mem::replace(&mut state.queue, Queue { - head: 0 as *mut Node, - tail: 0 as *mut Node, - }); - - let waiter = match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => None, - BlockedSender(task) => { - *state.canceled.take_unwrap() = true; - Some(task) - } - BlockedReceiver(..) => unreachable!(), - }; - mem::drop((state, guard)); - - loop { - match queue.dequeue() { - Some(task) => { task.wake().map(|t| t.reawaken()); } - None => break, - } - } - waiter.map(|t| t.wake().map(|t| t.reawaken())); - } - - //////////////////////////////////////////////////////////////////////////// - // select implementation - //////////////////////////////////////////////////////////////////////////// - - // If Ok, the value is whether this port has data, if Err, then the upgraded - // port needs to be checked instead of this one. - pub fn can_recv(&self) -> bool { - let (_g, state) = self.lock(); - state.disconnected || state.buf.size() > 0 - } - - // Attempts to start selection on this port. This can either succeed or fail - // because there is data waiting. - pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{ - let (_g, state) = self.lock(); - if state.disconnected || state.buf.size() > 0 { - Err(task) - } else { - match mem::replace(&mut state.blocker, BlockedReceiver(task)) { - NoneBlocked => {} - BlockedSender(..) => unreachable!(), - BlockedReceiver(..) => unreachable!(), - } - Ok(()) - } - } - - // Remove a previous selecting task from this port. This ensures that the - // blocked task will no longer be visible to any other threads. - // - // The return value indicates whether there's data on this port. - pub fn abort_selection(&self) -> bool { - let (_g, state) = self.lock(); - match mem::replace(&mut state.blocker, NoneBlocked) { - NoneBlocked => true, - BlockedSender(task) => { - state.blocker = BlockedSender(task); - true - } - BlockedReceiver(task) => { task.trash(); false } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Packet<T> { - fn drop(&mut self) { - assert_eq!(self.channels.load(atomics::SeqCst), 0); - let (_g, state) = self.lock(); - assert!(state.queue.dequeue().is_none()); - assert!(state.canceled.is_none()); - } -} - - -//////////////////////////////////////////////////////////////////////////////// -// Buffer, a simple ring buffer backed by Vec<T> -//////////////////////////////////////////////////////////////////////////////// - -impl<T> Buffer<T> { - fn enqueue(&mut self, t: T) { - let pos = (self.start + self.size) % self.buf.len(); - self.size += 1; - let prev = mem::replace(self.buf.get_mut(pos), Some(t)); - assert!(prev.is_none()); - } - - fn dequeue(&mut self) -> T { - let start = self.start; - self.size -= 1; - self.start = (self.start + 1) % self.buf.len(); - self.buf.get_mut(start).take_unwrap() - } - - fn size(&self) -> uint { self.size } - fn cap(&self) -> uint { self.buf.len() } -} - -//////////////////////////////////////////////////////////////////////////////// -// Queue, a simple queue to enqueue tasks with (stack-allocated nodes) -//////////////////////////////////////////////////////////////////////////////// - -impl Queue { - fn enqueue(&mut self, lock: &NativeMutex) { - let task: Box<Task> = Local::take(); - let mut node = Node { - task: None, - next: 0 as *mut Node, - }; - task.deschedule(1, |task| { - node.task = Some(task); - if self.tail.is_null() { - self.head = &mut node as *mut Node; - self.tail = &mut node as *mut Node; - } else { - unsafe { - (*self.tail).next = &mut node as *mut Node; - self.tail = &mut node as *mut Node; - } - } - unsafe { lock.unlock_noguard(); } - Ok(()) - }); - unsafe { lock.lock_noguard(); } - assert!(node.next.is_null()); - } - - fn dequeue(&mut self) -> Option<BlockedTask> { - if self.head.is_null() { - return None - } - let node = self.head; - self.head = unsafe { (*node).next }; - if self.head.is_null() { - self.tail = 0 as *mut Node; - } - unsafe { - (*node).next = 0 as *mut Node; - Some((*node).task.take_unwrap()) - } - } -} diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index 318410c45c1..0f7b89338b6 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -126,6 +126,7 @@ extern crate alloc; extern crate core; extern crate core_collections = "collections"; extern crate core_rand = "rand"; +extern crate core_sync = "sync"; extern crate libc; extern crate rustrt; @@ -174,6 +175,8 @@ pub use core_collections::vec; pub use rustrt::c_str; pub use rustrt::local_data; +pub use core_sync::comm; + // Run tests with libgreen instead of libnative. // // FIXME: This egregiously hacks around starting the test runner in a different @@ -237,10 +240,8 @@ pub mod collections; /* Tasks and communication */ pub mod task; -pub mod comm; pub mod sync; - /* Runtime and platform support */ pub mod c_vec; diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index a68a632bc37..66e7059422b 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -63,13 +63,10 @@ pub use self::util::{default_sched_threads, min_stack, running_on_valgrind}; // Reexport functionality from librustrt and other crates underneath the // standard library which work together to create the entire runtime. pub use alloc::{heap, libc_heap}; -pub use rustrt::{task, local, mutex, exclusive, stack, args, rtio}; +pub use rustrt::{task, local, mutex, exclusive, stack, args, rtio, thread}; pub use rustrt::{Stdio, Stdout, Stderr, begin_unwind, begin_unwind_fmt}; pub use rustrt::{bookkeeping, at_exit, unwind, DEFAULT_ERROR_CODE, Runtime}; -// Bindings to system threading libraries. -pub mod thread; - // Simple backtrace functionality (to print on failure) pub mod backtrace; diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs deleted file mode 100644 index 5a077e511c0..00000000000 --- a/src/libstd/rt/thread.rs +++ /dev/null @@ -1,348 +0,0 @@ -// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Native os-thread management -//! -//! This modules contains bindings necessary for managing OS-level threads. -//! These functions operate outside of the rust runtime, creating threads -//! which are not used for scheduling in any way. - -#![allow(non_camel_case_types)] -#![allow(unsigned_negate)] - -use kinds::Send; -use libc; -use mem; -use ops::Drop; -use option::{Option, Some, None}; -use owned::Box; -use uint; - -type StartFn = extern "C" fn(*libc::c_void) -> imp::rust_thread_return; - -/// This struct represents a native thread's state. This is used to join on an -/// existing thread created in the join-able state. -pub struct Thread<T> { - native: imp::rust_thread, - joined: bool, - packet: Box<Option<T>>, -} - -static DEFAULT_STACK_SIZE: uint = 1024 * 1024; - -// This is the starting point of rust os threads. The first thing we do -// is make sure that we don't trigger __morestack (also why this has a -// no_split_stack annotation), and then we extract the main function -// and invoke it. -#[no_split_stack] -extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return { - use rt::stack; - unsafe { - stack::record_stack_bounds(0, uint::MAX); - let f: Box<proc()> = mem::transmute(main); - (*f)(); - mem::transmute(0 as imp::rust_thread_return) - } -} - -// There are two impl blocks b/c if T were specified at the top then it's just a -// pain to specify a type parameter on Thread::spawn (which doesn't need the -// type parameter). -impl Thread<()> { - - /// Starts execution of a new OS thread. - /// - /// This function will not wait for the thread to join, but a handle to the - /// thread will be returned. - /// - /// Note that the handle returned is used to acquire the return value of the - /// procedure `main`. The `join` function will wait for the thread to finish - /// and return the value that `main` generated. - /// - /// Also note that the `Thread` returned will *always* wait for the thread - /// to finish executing. This means that even if `join` is not explicitly - /// called, when the `Thread` falls out of scope its destructor will block - /// waiting for the OS thread. - pub fn start<T: Send>(main: proc():Send -> T) -> Thread<T> { - Thread::start_stack(DEFAULT_STACK_SIZE, main) - } - - /// Performs the same functionality as `start`, but specifies an explicit - /// stack size for the new thread. - pub fn start_stack<T: Send>(stack: uint, main: proc():Send -> T) -> Thread<T> { - - // We need the address of the packet to fill in to be stable so when - // `main` fills it in it's still valid, so allocate an extra box to do - // so. - let packet = box None; - let packet2: *mut Option<T> = unsafe { - *mem::transmute::<&Box<Option<T>>, **mut Option<T>>(&packet) - }; - let main = proc() unsafe { *packet2 = Some(main()); }; - let native = unsafe { imp::create(stack, box main) }; - - Thread { - native: native, - joined: false, - packet: packet, - } - } - - /// This will spawn a new thread, but it will not wait for the thread to - /// finish, nor is it possible to wait for the thread to finish. - /// - /// This corresponds to creating threads in the 'detached' state on unix - /// systems. Note that platforms may not keep the main program alive even if - /// there are detached thread still running around. - pub fn spawn(main: proc():Send) { - Thread::spawn_stack(DEFAULT_STACK_SIZE, main) - } - - /// Performs the same functionality as `spawn`, but explicitly specifies a - /// stack size for the new thread. - pub fn spawn_stack(stack: uint, main: proc():Send) { - unsafe { - let handle = imp::create(stack, box main); - imp::detach(handle); - } - } - - /// Relinquishes the CPU slot that this OS-thread is currently using, - /// allowing another thread to run for awhile. - pub fn yield_now() { - unsafe { imp::yield_now(); } - } -} - -impl<T: Send> Thread<T> { - /// Wait for this thread to finish, returning the result of the thread's - /// calculation. - pub fn join(mut self) -> T { - assert!(!self.joined); - unsafe { imp::join(self.native) }; - self.joined = true; - assert!(self.packet.is_some()); - self.packet.take_unwrap() - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Thread<T> { - fn drop(&mut self) { - // This is required for correctness. If this is not done then the thread - // would fill in a return box which no longer exists. - if !self.joined { - unsafe { imp::join(self.native) }; - } - } -} - -#[cfg(windows)] -mod imp { - use mem; - use cmp; - use kinds::Send; - use libc; - use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, - LPVOID, DWORD, LPDWORD, HANDLE}; - use os; - use owned::Box; - use ptr; - use rt::stack::RED_ZONE; - - pub type rust_thread = HANDLE; - pub type rust_thread_return = DWORD; - - pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread { - let arg: *mut libc::c_void = mem::transmute(p); - // FIXME On UNIX, we guard against stack sizes that are too small but - // that's because pthreads enforces that stacks are at least - // PTHREAD_STACK_MIN bytes big. Windows has no such lower limit, it's - // just that below a certain threshold you can't do anything useful. - // That threshold is application and architecture-specific, however. - // For now, the only requirement is that it's big enough to hold the - // red zone. Round up to the next 64 kB because that's what the NT - // kernel does, might as well make it explicit. With the current - // 20 kB red zone, that makes for a 64 kB minimum stack. - let stack_size = (cmp::max(stack, RED_ZONE) + 0xfffe) & (-0xfffe - 1); - let ret = CreateThread(ptr::mut_null(), stack_size as libc::size_t, - super::thread_start, arg, 0, ptr::mut_null()); - - if ret as uint == 0 { - // be sure to not leak the closure - let _p: Box<proc():Send> = mem::transmute(arg); - fail!("failed to spawn native thread: {}", os::last_os_error()); - } - return ret; - } - - pub unsafe fn join(native: rust_thread) { - use libc::consts::os::extra::INFINITE; - WaitForSingleObject(native, INFINITE); - } - - pub unsafe fn detach(native: rust_thread) { - assert!(libc::CloseHandle(native) != 0); - } - - pub unsafe fn yield_now() { - // This function will return 0 if there are no other threads to execute, - // but this also means that the yield was useless so this isn't really a - // case that needs to be worried about. - SwitchToThread(); - } - - #[allow(non_snake_case_functions)] - extern "system" { - fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, - dwStackSize: SIZE_T, - lpStartAddress: super::StartFn, - lpParameter: LPVOID, - dwCreationFlags: DWORD, - lpThreadId: LPDWORD) -> HANDLE; - fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; - fn SwitchToThread() -> BOOL; - } -} - -#[cfg(unix)] -mod imp { - use cmp; - use kinds::Send; - use libc::consts::os::posix01::{PTHREAD_CREATE_JOINABLE, PTHREAD_STACK_MIN}; - use libc; - use mem; - use os; - use owned::Box; - use ptr; - use rt::stack::RED_ZONE; - - pub type rust_thread = libc::pthread_t; - pub type rust_thread_return = *u8; - - pub unsafe fn create(stack: uint, p: Box<proc():Send>) -> rust_thread { - let mut native: libc::pthread_t = mem::zeroed(); - let mut attr: libc::pthread_attr_t = mem::zeroed(); - assert_eq!(pthread_attr_init(&mut attr), 0); - assert_eq!(pthread_attr_setdetachstate(&mut attr, - PTHREAD_CREATE_JOINABLE), 0); - - // Reserve room for the red zone, the runtime's stack of last resort. - let stack_size = cmp::max(stack, RED_ZONE + min_stack_size(&attr) as uint); - match pthread_attr_setstacksize(&mut attr, stack_size as libc::size_t) { - 0 => { - }, - libc::EINVAL => { - // EINVAL means |stack_size| is either too small or not a - // multiple of the system page size. Because it's definitely - // >= PTHREAD_STACK_MIN, it must be an alignment issue. - // Round up to the nearest page and try again. - let page_size = os::page_size(); - let stack_size = (stack_size + page_size - 1) & (-(page_size - 1) - 1); - assert_eq!(pthread_attr_setstacksize(&mut attr, stack_size as libc::size_t), 0); - }, - errno => { - // This cannot really happen. - fail!("pthread_attr_setstacksize() error: {} ({})", os::last_os_error(), errno); - }, - }; - - let arg: *libc::c_void = mem::transmute(p); - let ret = pthread_create(&mut native, &attr, super::thread_start, arg); - assert_eq!(pthread_attr_destroy(&mut attr), 0); - - if ret != 0 { - // be sure to not leak the closure - let _p: Box<proc():Send> = mem::transmute(arg); - fail!("failed to spawn native thread: {}", os::last_os_error()); - } - native - } - - pub unsafe fn join(native: rust_thread) { - assert_eq!(pthread_join(native, ptr::null()), 0); - } - - pub unsafe fn detach(native: rust_thread) { - assert_eq!(pthread_detach(native), 0); - } - - pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); } - - // glibc >= 2.15 has a __pthread_get_minstack() function that returns - // PTHREAD_STACK_MIN plus however many bytes are needed for thread-local - // storage. We need that information to avoid blowing up when a small stack - // is created in an application with big thread-local storage requirements. - // See #6233 for rationale and details. - // - // Link weakly to the symbol for compatibility with older versions of glibc. - // Assumes that we've been dynamically linked to libpthread but that is - // currently always the case. Note that you need to check that the symbol - // is non-null before calling it! - #[cfg(target_os = "linux")] - fn min_stack_size(attr: *libc::pthread_attr_t) -> libc::size_t { - use ptr::RawPtr; - type F = unsafe extern "C" fn(*libc::pthread_attr_t) -> libc::size_t; - extern { - #[linkage = "extern_weak"] - static __pthread_get_minstack: *(); - } - if __pthread_get_minstack.is_null() { - PTHREAD_STACK_MIN - } else { - unsafe { mem::transmute::<*(), F>(__pthread_get_minstack)(attr) } - } - } - - // __pthread_get_minstack() is marked as weak but extern_weak linkage is - // not supported on OS X, hence this kludge... - #[cfg(not(target_os = "linux"))] - fn min_stack_size(_: *libc::pthread_attr_t) -> libc::size_t { - PTHREAD_STACK_MIN - } - - extern { - fn pthread_create(native: *mut libc::pthread_t, - attr: *libc::pthread_attr_t, - f: super::StartFn, - value: *libc::c_void) -> libc::c_int; - fn pthread_join(native: libc::pthread_t, - value: **libc::c_void) -> libc::c_int; - fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; - fn pthread_attr_destroy(attr: *mut libc::pthread_attr_t) -> libc::c_int; - fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, - stack_size: libc::size_t) -> libc::c_int; - fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, - state: libc::c_int) -> libc::c_int; - fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; - fn sched_yield() -> libc::c_int; - } -} - -#[cfg(test)] -mod tests { - use super::Thread; - - #[test] - fn smoke() { Thread::start(proc (){}).join(); } - - #[test] - fn data() { assert_eq!(Thread::start(proc () { 1 }).join(), 1); } - - #[test] - fn detached() { Thread::spawn(proc () {}) } - - #[test] - fn small_stacks() { - assert_eq!(42, Thread::start_stack(0, proc () 42).join()); - assert_eq!(42, Thread::start_stack(1, proc () 42).join()); - } -} - diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs deleted file mode 100644 index b2565a6a449..00000000000 --- a/src/libstd/sync/atomics.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Atomic types -//! -//! Atomic types provide primitive shared-memory communication between -//! threads, and are the building blocks of other concurrent -//! types. -//! -//! This module defines atomic versions of a select number of primitive -//! types, including `AtomicBool`, `AtomicInt`, `AtomicUint`, and `AtomicOption`. -//! Atomic types present operations that, when used correctly, synchronize -//! updates between threads. -//! -//! Each method takes an `Ordering` which represents the strength of -//! the memory barrier for that operation. These orderings are the -//! same as [C++11 atomic orderings][1]. -//! -//! [1]: http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync -//! -//! Atomic variables are safe to share between threads (they implement `Share`) -//! but they do not themselves provide the mechanism for sharing. The most -//! common way to share an atomic variable is to put it into an `Arc` (an -//! atomically-reference-counted shared pointer). -//! -//! Most atomic types may be stored in static variables, initialized using -//! the provided static initializers like `INIT_ATOMIC_BOOL`. Atomic statics -//! are often used for lazy global initialization. -//! -//! -//! # Examples -//! -//! A simple spinlock: -//! -//! ``` -//! extern crate sync; -//! -//! use sync::Arc; -//! use std::sync::atomics::{AtomicUint, SeqCst}; -//! use std::task::deschedule; -//! -//! fn main() { -//! let spinlock = Arc::new(AtomicUint::new(1)); -//! -//! let spinlock_clone = spinlock.clone(); -//! spawn(proc() { -//! spinlock_clone.store(0, SeqCst); -//! }); -//! -//! // Wait for the other task to release the lock -//! while spinlock.load(SeqCst) != 0 { -//! // Since tasks may not be preemptive (if they are green threads) -//! // yield to the scheduler to let the other task run. Low level -//! // concurrent code needs to take into account Rust's two threading -//! // models. -//! deschedule(); -//! } -//! } -//! ``` -//! -//! Transferring a heap object with `AtomicOption`: -//! -//! ``` -//! extern crate sync; -//! -//! use sync::Arc; -//! use std::sync::atomics::{AtomicOption, SeqCst}; -//! -//! fn main() { -//! struct BigObject; -//! -//! let shared_big_object = Arc::new(AtomicOption::empty()); -//! -//! let shared_big_object_clone = shared_big_object.clone(); -//! spawn(proc() { -//! let unwrapped_big_object = shared_big_object_clone.take(SeqCst); -//! if unwrapped_big_object.is_some() { -//! println!("got a big object from another task"); -//! } else { -//! println!("other task hasn't sent big object yet"); -//! } -//! }); -//! -//! shared_big_object.swap(box BigObject, SeqCst); -//! } -//! ``` -//! -//! Keep a global count of live tasks: -//! -//! ``` -//! use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; -//! -//! static mut GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; -//! -//! unsafe { -//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); -//! println!("live tasks: {}", old_task_count + 1); -//! } -//! ``` - -use mem; -use ops::Drop; -use option::{Option,Some,None}; -use owned::Box; - -pub use core::atomics::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; -pub use core::atomics::{Ordering, Relaxed, Release, Acquire, AcqRel, SeqCst}; -pub use core::atomics::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; -pub use core::atomics::fence; - -/// An atomic, nullable unique pointer -/// -/// This can be used as the concurrency primitive for operations that transfer -/// owned heap objects across tasks. -#[unsafe_no_drop_flag] -pub struct AtomicOption<T> { - p: AtomicUint, -} - -impl<T> AtomicOption<T> { - /// Create a new `AtomicOption` - pub fn new(p: Box<T>) -> AtomicOption<T> { - unsafe { AtomicOption { p: AtomicUint::new(mem::transmute(p)) } } - } - - /// Create a new `AtomicOption` that doesn't contain a value - pub fn empty() -> AtomicOption<T> { AtomicOption { p: AtomicUint::new(0) } } - - /// Store a value, returning the old value - #[inline] - pub fn swap(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - let val = unsafe { mem::transmute(val) }; - - match self.p.swap(val, order) { - 0 => None, - n => Some(unsafe { mem::transmute(n) }), - } - } - - /// Remove the value, leaving the `AtomicOption` empty. - #[inline] - pub fn take(&self, order: Ordering) -> Option<Box<T>> { - unsafe { self.swap(mem::transmute(0), order) } - } - - /// Replace an empty value with a non-empty value. - /// - /// Succeeds if the option is `None` and returns `None` if so. If - /// the option was already `Some`, returns `Some` of the rejected - /// value. - #[inline] - pub fn fill(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> { - unsafe { - let val = mem::transmute(val); - let expected = mem::transmute(0); - let oldval = self.p.compare_and_swap(expected, val, order); - if oldval == expected { - None - } else { - Some(mem::transmute(val)) - } - } - } - - /// Returns `true` if the `AtomicOption` is empty. - /// - /// Be careful: The caller must have some external method of ensuring the - /// result does not get invalidated by another task after this returns. - #[inline] - pub fn is_empty(&self, order: Ordering) -> bool { - self.p.load(order) as uint == 0 - } -} - -#[unsafe_destructor] -impl<T> Drop for AtomicOption<T> { - fn drop(&mut self) { - let _ = self.take(SeqCst); - } -} - -#[cfg(test)] -mod test { - use option::*; - use super::*; - - #[test] - fn option_empty() { - let option: AtomicOption<()> = AtomicOption::empty(); - assert!(option.is_empty(SeqCst)); - } - - #[test] - fn option_swap() { - let p = AtomicOption::new(box 1); - let a = box 2; - - let b = p.swap(a, SeqCst); - - assert!(b == Some(box 1)); - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_take() { - let p = AtomicOption::new(box 1); - - assert!(p.take(SeqCst) == Some(box 1)); - assert!(p.take(SeqCst) == None); - - let p2 = box 2; - p.swap(p2, SeqCst); - - assert!(p.take(SeqCst) == Some(box 2)); - } - - #[test] - fn option_fill() { - let p = AtomicOption::new(box 1); - assert!(p.fill(box 2, SeqCst).is_some()); // should fail; shouldn't leak! - assert!(p.take(SeqCst) == Some(box 1)); - - assert!(p.fill(box 2, SeqCst).is_none()); // shouldn't fail - assert!(p.take(SeqCst) == Some(box 2)); - } -} - diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs deleted file mode 100644 index 39e420685ab..00000000000 --- a/src/libstd/sync/deque.rs +++ /dev/null @@ -1,666 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A (mostly) lock-free concurrent work-stealing deque -//! -//! This module contains an implementation of the Chase-Lev work stealing deque -//! described in "Dynamic Circular Work-Stealing Deque". The implementation is -//! heavily based on the pseudocode found in the paper. -//! -//! This implementation does not want to have the restriction of a garbage -//! collector for reclamation of buffers, and instead it uses a shared pool of -//! buffers. This shared pool is required for correctness in this -//! implementation. -//! -//! The only lock-synchronized portions of this deque are the buffer allocation -//! and deallocation portions. Otherwise all operations are lock-free. -//! -//! # Example -//! -//! use std::rt::deque::BufferPool; -//! -//! let mut pool = BufferPool::new(); -//! let (mut worker, mut stealer) = pool.deque(); -//! -//! // Only the worker may push/pop -//! worker.push(1); -//! worker.pop(); -//! -//! // Stealers take data from the other end of the deque -//! worker.push(1); -//! stealer.steal(); -//! -//! // Stealers can be cloned to have many stealers stealing in parallel -//! worker.push(1); -//! let mut stealer2 = stealer.clone(); -//! stealer2.steal(); - -// NB: the "buffer pool" strategy is not done for speed, but rather for -// correctness. For more info, see the comment on `swap_buffer` - -// FIXME: all atomic operations in this module use a SeqCst ordering. That is -// probably overkill - -use alloc::arc::Arc; - -use clone::Clone; -use iter::{range, Iterator}; -use kinds::Send; -use kinds::marker; -use mem::{forget, min_align_of, size_of, transmute, overwrite}; -use ops::Drop; -use option::{Option, Some, None}; -use owned::Box; -use ptr::RawPtr; -use ptr; -use rt::heap::{allocate, deallocate}; -use slice::ImmutableVector; -use sync::atomics::{AtomicInt, AtomicPtr, SeqCst}; -use rt::exclusive::Exclusive; -use vec::Vec; - -// Once the queue is less than 1/K full, then it will be downsized. Note that -// the deque requires that this number be less than 2. -static K: int = 4; - -// Minimum number of bits that a buffer size should be. No buffer will resize to -// under this value, and all deques will initially contain a buffer of this -// size. -// -// The size in question is 1 << MIN_BITS -static MIN_BITS: int = 7; - -struct Deque<T> { - bottom: AtomicInt, - top: AtomicInt, - array: AtomicPtr<Buffer<T>>, - pool: BufferPool<T>, -} - -/// Worker half of the work-stealing deque. This worker has exclusive access to -/// one side of the deque, and uses `push` and `pop` method to manipulate it. -/// -/// There may only be one worker per deque. -pub struct Worker<T> { - deque: Arc<Deque<T>>, - noshare: marker::NoShare, -} - -/// The stealing half of the work-stealing deque. Stealers have access to the -/// opposite end of the deque from the worker, and they only have access to the -/// `steal` method. -pub struct Stealer<T> { - deque: Arc<Deque<T>>, - noshare: marker::NoShare, -} - -/// When stealing some data, this is an enumeration of the possible outcomes. -#[deriving(PartialEq, Show)] -pub enum Stolen<T> { - /// The deque was empty at the time of stealing - Empty, - /// The stealer lost the race for stealing data, and a retry may return more - /// data. - Abort, - /// The stealer has successfully stolen some data. - Data(T), -} - -/// The allocation pool for buffers used by work-stealing deques. Right now this -/// structure is used for reclamation of memory after it is no longer in use by -/// deques. -/// -/// This data structure is protected by a mutex, but it is rarely used. Deques -/// will only use this structure when allocating a new buffer or deallocating a -/// previous one. -pub struct BufferPool<T> { - pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>, -} - -/// An internal buffer used by the chase-lev deque. This structure is actually -/// implemented as a circular buffer, and is used as the intermediate storage of -/// the data in the deque. -/// -/// This type is implemented with *T instead of Vec<T> for two reasons: -/// -/// 1. There is nothing safe about using this buffer. This easily allows the -/// same value to be read twice in to rust, and there is nothing to -/// prevent this. The usage by the deque must ensure that one of the -/// values is forgotten. Furthermore, we only ever want to manually run -/// destructors for values in this buffer (on drop) because the bounds -/// are defined by the deque it's owned by. -/// -/// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although -/// LLVM is probably pretty good at doing this already. -struct Buffer<T> { - storage: *T, - log_size: int, -} - -impl<T: Send> BufferPool<T> { - /// Allocates a new buffer pool which in turn can be used to allocate new - /// deques. - pub fn new() -> BufferPool<T> { - BufferPool { pool: Arc::new(Exclusive::new(vec!())) } - } - - /// Allocates a new work-stealing deque which will send/receiving memory to - /// and from this buffer pool. - pub fn deque(&self) -> (Worker<T>, Stealer<T>) { - let a = Arc::new(Deque::new(self.clone())); - let b = a.clone(); - (Worker { deque: a, noshare: marker::NoShare }, - Stealer { deque: b, noshare: marker::NoShare }) - } - - fn alloc(&self, bits: int) -> Box<Buffer<T>> { - unsafe { - let mut pool = self.pool.lock(); - match pool.iter().position(|x| x.size() >= (1 << bits)) { - Some(i) => pool.remove(i).unwrap(), - None => box Buffer::new(bits) - } - } - } - - fn free(&self, buf: Box<Buffer<T>>) { - unsafe { - let mut pool = self.pool.lock(); - match pool.iter().position(|v| v.size() > buf.size()) { - Some(i) => pool.insert(i, buf), - None => pool.push(buf), - } - } - } -} - -impl<T: Send> Clone for BufferPool<T> { - fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } } -} - -impl<T: Send> Worker<T> { - /// Pushes data onto the front of this work queue. - pub fn push(&self, t: T) { - unsafe { self.deque.push(t) } - } - /// Pops data off the front of the work queue, returning `None` on an empty - /// queue. - pub fn pop(&self) -> Option<T> { - unsafe { self.deque.pop() } - } - - /// Gets access to the buffer pool that this worker is attached to. This can - /// be used to create more deques which share the same buffer pool as this - /// deque. - pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { - &self.deque.pool - } -} - -impl<T: Send> Stealer<T> { - /// Steals work off the end of the queue (opposite of the worker's end) - pub fn steal(&self) -> Stolen<T> { - unsafe { self.deque.steal() } - } - - /// Gets access to the buffer pool that this stealer is attached to. This - /// can be used to create more deques which share the same buffer pool as - /// this deque. - pub fn pool<'a>(&'a self) -> &'a BufferPool<T> { - &self.deque.pool - } -} - -impl<T: Send> Clone for Stealer<T> { - fn clone(&self) -> Stealer<T> { - Stealer { deque: self.deque.clone(), noshare: marker::NoShare } - } -} - -// Almost all of this code can be found directly in the paper so I'm not -// personally going to heavily comment what's going on here. - -impl<T: Send> Deque<T> { - fn new(pool: BufferPool<T>) -> Deque<T> { - let buf = pool.alloc(MIN_BITS); - Deque { - bottom: AtomicInt::new(0), - top: AtomicInt::new(0), - array: AtomicPtr::new(unsafe { transmute(buf) }), - pool: pool, - } - } - - unsafe fn push(&self, data: T) { - let mut b = self.bottom.load(SeqCst); - let t = self.top.load(SeqCst); - let mut a = self.array.load(SeqCst); - let size = b - t; - if size >= (*a).size() - 1 { - // You won't find this code in the chase-lev deque paper. This is - // alluded to in a small footnote, however. We always free a buffer - // when growing in order to prevent leaks. - a = self.swap_buffer(b, a, (*a).resize(b, t, 1)); - b = self.bottom.load(SeqCst); - } - (*a).put(b, data); - self.bottom.store(b + 1, SeqCst); - } - - unsafe fn pop(&self) -> Option<T> { - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let b = b - 1; - self.bottom.store(b, SeqCst); - let t = self.top.load(SeqCst); - let size = b - t; - if size < 0 { - self.bottom.store(t, SeqCst); - return None; - } - let data = (*a).get(b); - if size > 0 { - self.maybe_shrink(b, t); - return Some(data); - } - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - self.bottom.store(t + 1, SeqCst); - return Some(data); - } else { - self.bottom.store(t + 1, SeqCst); - forget(data); // someone else stole this value - return None; - } - } - - unsafe fn steal(&self) -> Stolen<T> { - let t = self.top.load(SeqCst); - let old = self.array.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - let size = b - t; - if size <= 0 { return Empty } - if size % (*a).size() == 0 { - if a == old && t == self.top.load(SeqCst) { - return Empty - } - return Abort - } - let data = (*a).get(t); - if self.top.compare_and_swap(t, t + 1, SeqCst) == t { - Data(data) - } else { - forget(data); // someone else stole this value - Abort - } - } - - unsafe fn maybe_shrink(&self, b: int, t: int) { - let a = self.array.load(SeqCst); - if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) { - self.swap_buffer(b, a, (*a).resize(b, t, -1)); - } - } - - // Helper routine not mentioned in the paper which is used in growing and - // shrinking buffers to swap in a new buffer into place. As a bit of a - // recap, the whole point that we need a buffer pool rather than just - // calling malloc/free directly is that stealers can continue using buffers - // after this method has called 'free' on it. The continued usage is simply - // a read followed by a forget, but we must make sure that the memory can - // continue to be read after we flag this buffer for reclamation. - unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>, - buf: Buffer<T>) -> *mut Buffer<T> { - let newbuf: *mut Buffer<T> = transmute(box buf); - self.array.store(newbuf, SeqCst); - let ss = (*newbuf).size(); - self.bottom.store(b + ss, SeqCst); - let t = self.top.load(SeqCst); - if self.top.compare_and_swap(t, t + ss, SeqCst) != t { - self.bottom.store(b, SeqCst); - } - self.pool.free(transmute(old)); - return newbuf; - } -} - - -#[unsafe_destructor] -impl<T: Send> Drop for Deque<T> { - fn drop(&mut self) { - let t = self.top.load(SeqCst); - let b = self.bottom.load(SeqCst); - let a = self.array.load(SeqCst); - // Free whatever is leftover in the dequeue, and then move the buffer - // back into the pool. - for i in range(t, b) { - let _: T = unsafe { (*a).get(i) }; - } - self.pool.free(unsafe { transmute(a) }); - } -} - -#[inline] -fn buffer_alloc_size<T>(log_size: int) -> uint { - (1 << log_size) * size_of::<T>() -} - -impl<T: Send> Buffer<T> { - unsafe fn new(log_size: int) -> Buffer<T> { - let size = buffer_alloc_size::<T>(log_size); - let buffer = allocate(size, min_align_of::<T>()); - Buffer { - storage: buffer as *T, - log_size: log_size, - } - } - - fn size(&self) -> int { 1 << self.log_size } - - // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly - fn mask(&self) -> int { (1 << self.log_size) - 1 } - - unsafe fn elem(&self, i: int) -> *T { self.storage.offset(i & self.mask()) } - - // This does not protect against loading duplicate values of the same cell, - // nor does this clear out the contents contained within. Hence, this is a - // very unsafe method which the caller needs to treat specially in case a - // race is lost. - unsafe fn get(&self, i: int) -> T { - ptr::read(self.elem(i)) - } - - // Unsafe because this unsafely overwrites possibly uninitialized or - // initialized data. - unsafe fn put(&self, i: int, t: T) { - overwrite(self.elem(i) as *mut T, t); - } - - // Again, unsafe because this has incredibly dubious ownership violations. - // It is assumed that this buffer is immediately dropped. - unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> { - let buf = Buffer::new(self.log_size + delta); - for i in range(t, b) { - buf.put(i, self.get(i)); - } - return buf; - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Buffer<T> { - fn drop(&mut self) { - // It is assumed that all buffers are empty on drop. - let size = buffer_alloc_size::<T>(self.log_size); - unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::{Data, BufferPool, Abort, Empty, Worker, Stealer}; - - use mem; - use owned::Box; - use rt::thread::Thread; - use rand; - use rand::Rng; - use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst, - AtomicUint, INIT_ATOMIC_UINT}; - use vec; - - #[test] - fn smoke() { - let pool = BufferPool::new(); - let (w, s) = pool.deque(); - assert_eq!(w.pop(), None); - assert_eq!(s.steal(), Empty); - w.push(1); - assert_eq!(w.pop(), Some(1)); - w.push(1); - assert_eq!(s.steal(), Data(1)); - w.push(1); - assert_eq!(s.clone().steal(), Data(1)); - } - - #[test] - fn stealpush() { - static AMT: int = 100000; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data(i) => { - assert_eq!(i, 1); - left -= 1; - } - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push(1); - } - - t.join(); - } - - #[test] - fn stealpush_large() { - static AMT: int = 100000; - let pool = BufferPool::<(int, int)>::new(); - let (w, s) = pool.deque(); - let t = Thread::start(proc() { - let mut left = AMT; - while left > 0 { - match s.steal() { - Data((1, 10)) => { left -= 1; } - Data(..) => fail!(), - Abort | Empty => {} - } - } - }); - - for _ in range(0, AMT) { - w.push((1, 10)); - } - - t.join(); - } - - fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>, - nthreads: int, amt: uint) { - for _ in range(0, amt) { - w.push(box 20); - } - let mut remaining = AtomicUint::new(amt); - let unsafe_remaining: *mut AtomicUint = &mut remaining; - - let threads = range(0, nthreads).map(|_| { - let s = s.clone(); - Thread::start(proc() { - unsafe { - while (*unsafe_remaining).load(SeqCst) > 0 { - match s.steal() { - Data(box 20) => { - (*unsafe_remaining).fetch_sub(1, SeqCst); - } - Data(..) => fail!(), - Abort | Empty => {} - } - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - while remaining.load(SeqCst) > 0 { - match w.pop() { - Some(box 20) => { remaining.fetch_sub(1, SeqCst); } - Some(..) => fail!(), - None => {} - } - } - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn run_stampede() { - let pool = BufferPool::<Box<int>>::new(); - let (w, s) = pool.deque(); - stampede(w, s, 8, 10000); - } - - #[test] - fn many_stampede() { - static AMT: uint = 4; - let pool = BufferPool::<Box<int>>::new(); - let threads = range(0, AMT).map(|_| { - let (w, s) = pool.deque(); - Thread::start(proc() { - stampede(w, s, 4, 10000); - }) - }).collect::<Vec<Thread<()>>>(); - - for thread in threads.move_iter() { - thread.join(); - } - } - - #[test] - fn stress() { - static AMT: int = 100000; - static NTHREADS: int = 8; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - static mut HITS: AtomicUint = INIT_ATOMIC_UINT; - let pool = BufferPool::<int>::new(); - let (w, s) = pool.deque(); - - let threads = range(0, NTHREADS).map(|_| { - let s = s.clone(); - Thread::start(proc() { - unsafe { - loop { - match s.steal() { - Data(2) => { HITS.fetch_add(1, SeqCst); } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }) - }).collect::<Vec<Thread<()>>>(); - - let mut rng = rand::task_rng(); - let mut expected = 0; - while expected < AMT { - if rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some(2) => unsafe { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } else { - expected += 1; - w.push(2); - } - } - - unsafe { - while HITS.load(SeqCst) < AMT as uint { - match w.pop() { - None => {} - Some(2) => { HITS.fetch_add(1, SeqCst); }, - Some(_) => fail!(), - } - } - DONE.store(true, SeqCst); - } - - for thread in threads.move_iter() { - thread.join(); - } - - assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint); - } - - #[test] - #[ignore(cfg(windows))] // apparently windows scheduling is weird? - fn no_starvation() { - static AMT: int = 10000; - static NTHREADS: int = 4; - static mut DONE: AtomicBool = INIT_ATOMIC_BOOL; - let pool = BufferPool::<(int, uint)>::new(); - let (w, s) = pool.deque(); - - let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| { - let s = s.clone(); - let unique_box = box AtomicUint::new(0); - let thread_box = unsafe { - *mem::transmute::<&Box<AtomicUint>, **mut AtomicUint>(&unique_box) - }; - (Thread::start(proc() { - unsafe { - loop { - match s.steal() { - Data((1, 2)) => { - (*thread_box).fetch_add(1, SeqCst); - } - Data(..) => fail!(), - _ if DONE.load(SeqCst) => break, - _ => {} - } - } - } - }), unique_box) - })); - - let mut rng = rand::task_rng(); - let mut myhit = false; - let mut iter = 0; - 'outer: loop { - for _ in range(0, rng.gen_range(0, AMT)) { - if !myhit && rng.gen_range(0, 3) == 2 { - match w.pop() { - None => {} - Some((1, 2)) => myhit = true, - Some(_) => fail!(), - } - } else { - w.push((1, 2)); - } - } - iter += 1; - - debug!("loop iteration {}", iter); - for (i, slot) in hits.iter().enumerate() { - let amt = slot.load(SeqCst); - debug!("thread {}: {}", i, amt); - if amt == 0 { continue 'outer; } - } - if myhit { - break - } - } - - unsafe { DONE.store(true, SeqCst); } - - for thread in threads.move_iter() { - thread.join(); - } - } -} diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs new file mode 100644 index 00000000000..bc748324fcd --- /dev/null +++ b/src/libstd/sync/future.rs @@ -0,0 +1,209 @@ +// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/*! + * A type representing values that may be computed concurrently and + * operations for working with them. + * + * # Example + * + * ```rust + * use std::sync::Future; + * # fn fib(n: uint) -> uint {42}; + * # fn make_a_sandwich() {}; + * let mut delayed_fib = Future::spawn(proc() { fib(5000) }); + * make_a_sandwich(); + * println!("fib(5000) = {}", delayed_fib.get()) + * ``` + */ + +#![allow(missing_doc)] + +use core::prelude::*; +use core::mem::replace; + +use comm::{Receiver, channel}; +use task::spawn; + +/// A type encapsulating the result of a computation which may not be complete +pub struct Future<A> { + state: FutureState<A>, +} + +enum FutureState<A> { + Pending(proc():Send -> A), + Evaluating, + Forced(A) +} + +/// Methods on the `future` type +impl<A:Clone> Future<A> { + pub fn get(&mut self) -> A { + //! Get the value of the future. + (*(self.get_ref())).clone() + } +} + +impl<A> Future<A> { + /// Gets the value from this future, forcing evaluation. + pub fn unwrap(mut self) -> A { + self.get_ref(); + let state = replace(&mut self.state, Evaluating); + match state { + Forced(v) => v, + _ => fail!( "Logic error." ), + } + } + + pub fn get_ref<'a>(&'a mut self) -> &'a A { + /*! + * Executes the future's closure and then returns a reference + * to the result. The reference lasts as long as + * the future. + */ + match self.state { + Forced(ref v) => return v, + Evaluating => fail!("Recursive forcing of future!"), + Pending(_) => { + match replace(&mut self.state, Evaluating) { + Forced(_) | Evaluating => fail!("Logic error."), + Pending(f) => { + self.state = Forced(f()); + self.get_ref() + } + } + } + } + } + + pub fn from_value(val: A) -> Future<A> { + /*! + * Create a future from a value. + * + * The value is immediately available and calling `get` later will + * not block. + */ + + Future {state: Forced(val)} + } + + pub fn from_fn(f: proc():Send -> A) -> Future<A> { + /*! + * Create a future from a function. + * + * The first time that the value is requested it will be retrieved by + * calling the function. Note that this function is a local + * function. It is not spawned into another task. + */ + + Future {state: Pending(f)} + } +} + +impl<A:Send> Future<A> { + pub fn from_receiver(rx: Receiver<A>) -> Future<A> { + /*! + * Create a future from a port + * + * The first time that the value is requested the task will block + * waiting for the result to be received on the port. + */ + + Future::from_fn(proc() { + rx.recv() + }) + } + + pub fn spawn(blk: proc():Send -> A) -> Future<A> { + /*! + * Create a future from a unique closure. + * + * The closure will be run in a new task and its result used as the + * value of the future. + */ + + let (tx, rx) = channel(); + + spawn(proc() { + tx.send(blk()); + }); + + Future::from_receiver(rx) + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use sync::Future; + use task; + + #[test] + fn test_from_value() { + let mut f = Future::from_value("snail".to_string()); + assert_eq!(f.get(), "snail".to_string()); + } + + #[test] + fn test_from_receiver() { + let (tx, rx) = channel(); + tx.send("whale".to_string()); + let mut f = Future::from_receiver(rx); + assert_eq!(f.get(), "whale".to_string()); + } + + #[test] + fn test_from_fn() { + let mut f = Future::from_fn(proc() "brail".to_string()); + assert_eq!(f.get(), "brail".to_string()); + } + + #[test] + fn test_interface_get() { + let mut f = Future::from_value("fail".to_string()); + assert_eq!(f.get(), "fail".to_string()); + } + + #[test] + fn test_interface_unwrap() { + let f = Future::from_value("fail".to_string()); + assert_eq!(f.unwrap(), "fail".to_string()); + } + + #[test] + fn test_get_ref_method() { + let mut f = Future::from_value(22); + assert_eq!(*f.get_ref(), 22); + } + + #[test] + fn test_spawn() { + let mut f = Future::spawn(proc() "bale".to_string()); + assert_eq!(f.get(), "bale".to_string()); + } + + #[test] + #[should_fail] + fn test_futurefail() { + let mut f = Future::spawn(proc() fail!()); + let _x: String = f.get(); + } + + #[test] + fn test_sendable_future() { + let expected = "schlorf"; + let f = Future::spawn(proc() { expected }); + task::spawn(proc() { + let mut f = f; + let actual = f.get(); + assert_eq!(actual, expected); + }); + } +} diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index b2cf427edc8..5f45ce25502 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -15,8 +15,14 @@ //! and/or blocking at all, but rather provide the necessary tools to build //! other types of concurrent primitives. -pub mod atomics; -pub mod deque; -pub mod mpmc_bounded_queue; -pub mod mpsc_queue; -pub mod spsc_queue; +pub use core_sync::{atomics, deque, mpmc_bounded_queue, mpsc_queue, spsc_queue}; +pub use core_sync::{Arc, Weak, Mutex, MutexGuard, Condvar, Barrier}; +pub use core_sync::{RWLock, RWLockReadGuard, RWLockWriteGuard}; +pub use core_sync::{Semaphore, SemaphoreGuard}; +pub use core_sync::one::{Once, ONCE_INIT}; + +pub use self::future::Future; +pub use self::task_pool::TaskPool; + +mod future; +mod task_pool; diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs deleted file mode 100644 index ffad9c1c583..00000000000 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -#![allow(missing_doc, dead_code)] - -// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -use alloc::arc::Arc; - -use clone::Clone; -use kinds::Send; -use num::next_power_of_two; -use option::{Option, Some, None}; -use sync::atomics::{AtomicUint,Relaxed,Release,Acquire}; -use vec::Vec; -use ty::Unsafe; - -struct Node<T> { - sequence: AtomicUint, - value: Option<T>, -} - -struct State<T> { - pad0: [u8, ..64], - buffer: Vec<Unsafe<Node<T>>>, - mask: uint, - pad1: [u8, ..64], - enqueue_pos: AtomicUint, - pad2: [u8, ..64], - dequeue_pos: AtomicUint, - pad3: [u8, ..64], -} - -pub struct Queue<T> { - state: Arc<State<T>>, -} - -impl<T: Send> State<T> { - fn with_capacity(capacity: uint) -> State<T> { - let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 { - if capacity < 2 { - 2u - } else { - // use next power of 2 as capacity - next_power_of_two(capacity) - } - } else { - capacity - }; - let buffer = Vec::from_fn(capacity, |i| { - Unsafe::new(Node { sequence:AtomicUint::new(i), value: None }) - }); - State{ - pad0: [0, ..64], - buffer: buffer, - mask: capacity-1, - pad1: [0, ..64], - enqueue_pos: AtomicUint::new(0), - pad2: [0, ..64], - dequeue_pos: AtomicUint::new(0), - pad3: [0, ..64], - } - } - - fn push(&self, value: T) -> bool { - let mask = self.mask; - let mut pos = self.enqueue_pos.load(Relaxed); - loop { - let node = self.buffer.get(pos & mask); - let seq = unsafe { (*node.get()).sequence.load(Acquire) }; - let diff: int = seq as int - pos as int; - - if diff == 0 { - let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed); - if enqueue_pos == pos { - unsafe { - (*node.get()).value = Some(value); - (*node.get()).sequence.store(pos+1, Release); - } - break - } else { - pos = enqueue_pos; - } - } else if diff < 0 { - return false - } else { - pos = self.enqueue_pos.load(Relaxed); - } - } - true - } - - fn pop(&self) -> Option<T> { - let mask = self.mask; - let mut pos = self.dequeue_pos.load(Relaxed); - loop { - let node = self.buffer.get(pos & mask); - let seq = unsafe { (*node.get()).sequence.load(Acquire) }; - let diff: int = seq as int - (pos + 1) as int; - if diff == 0 { - let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed); - if dequeue_pos == pos { - unsafe { - let value = (*node.get()).value.take(); - (*node.get()).sequence.store(pos + mask + 1, Release); - return value - } - } else { - pos = dequeue_pos; - } - } else if diff < 0 { - return None - } else { - pos = self.dequeue_pos.load(Relaxed); - } - } - } -} - -impl<T: Send> Queue<T> { - pub fn with_capacity(capacity: uint) -> Queue<T> { - Queue{ - state: Arc::new(State::with_capacity(capacity)) - } - } - - pub fn push(&self, value: T) -> bool { - self.state.push(value) - } - - pub fn pop(&self) -> Option<T> { - self.state.pop() - } -} - -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { state: self.state.clone() } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - use super::Queue; - use native; - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let q = Queue::with_capacity(nthreads*nmsgs); - assert_eq!(None, q.pop()); - let (tx, rx) = channel(); - - for _ in range(0, nthreads) { - let q = q.clone(); - let tx = tx.clone(); - native::task::spawn(proc() { - let q = q; - for i in range(0, nmsgs) { - assert!(q.push(i)); - } - tx.send(()); - }); - } - - let mut completion_rxs = vec![]; - for _ in range(0, nthreads) { - let (tx, rx) = channel(); - completion_rxs.push(rx); - let q = q.clone(); - native::task::spawn(proc() { - let q = q; - let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nmsgs { break } - } - } - } - tx.send(i); - }); - } - - for rx in completion_rxs.mut_iter() { - assert_eq!(nmsgs, rx.recv()); - } - for _ in range(0, nthreads) { - rx.recv(); - } - } -} diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs deleted file mode 100644 index 4db24e82d37..00000000000 --- a/src/libstd/sync/mpsc_queue.rs +++ /dev/null @@ -1,208 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between tasks, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue may not be appropriate for all use-cases. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -use kinds::Send; -use mem; -use ops::Drop; -use option::{Option, None, Some}; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; -use ty::Unsafe; - -/// A result of the `pop` function. -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue<T> { - head: AtomicPtr<Node<T>>, - tail: Unsafe<*mut Node<T>>, -} - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - mem::transmute(box Node { - next: AtomicPtr::new(0 as *mut Node<T>), - value: v, - }) - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue<T> { - let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: Unsafe::new(stub), - } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option<T>`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&self) -> PopResult<T> { - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: Box<Node<T>> = mem::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } - } - - /// Attempts to pop data from this queue, but doesn't attempt too hard. This - /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&self) -> Option<T> { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod tests { - use prelude::*; - - use alloc::arc::Arc; - - use native; - use super::{Queue, Data, Empty, Inconsistent}; - - #[test] - fn test_full() { - let q = Queue::new(); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn test() { - let nthreads = 8u; - let nmsgs = 1000u; - let q = Queue::new(); - match q.pop() { - Empty => {} - Inconsistent | Data(..) => fail!() - } - let (tx, rx) = channel(); - let q = Arc::new(q); - - for _ in range(0, nthreads) { - let tx = tx.clone(); - let q = q.clone(); - native::task::spawn(proc() { - for i in range(0, nmsgs) { - q.push(i); - } - tx.send(()); - }); - } - - let mut i = 0u; - while i < nthreads * nmsgs { - match q.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - drop(tx); - for _ in range(0, nthreads) { - rx.recv(); - } - } -} diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs deleted file mode 100644 index fb515c9db6e..00000000000 --- a/src/libstd/sync/spsc_queue.rs +++ /dev/null @@ -1,300 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue - -//! A single-producer single-consumer concurrent queue -//! -//! This module contains the implementation of an SPSC queue which can be used -//! concurrently between two tasks. This data structure is safe to use and -//! enforces the semantics that there is one pusher and one popper. - -use kinds::Send; -use mem; -use ops::Drop; -use option::{Some, None, Option}; -use owned::Box; -use ptr::RawPtr; -use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; -use ty::Unsafe; - -// Node within the linked list queue of messages to send -struct Node<T> { - // FIXME: this could be an uninitialized T if we're careful enough, and - // that would reduce memory usage (and be a bit faster). - // is it worth it? - value: Option<T>, // nullable for re-use of nodes - next: AtomicPtr<Node<T>>, // next node in the queue -} - -/// The single-producer single-consumer queue. This structure is not cloneable, -/// but it can be safely shared in an Arc if it is guaranteed that there -/// is only one popper and one pusher touching the queue at any one point in -/// time. -pub struct Queue<T> { - // consumer fields - tail: Unsafe<*mut Node<T>>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from - - // producer fields - head: Unsafe<*mut Node<T>>, // where to push to - first: Unsafe<*mut Node<T>>, // where to get new nodes from - tail_copy: Unsafe<*mut Node<T>>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, -} - -impl<T: Send> Node<T> { - fn new() -> *mut Node<T> { - unsafe { - mem::transmute(box Node { - value: None, - next: AtomicPtr::new(0 as *mut Node<T>), - }) - } - } -} - -impl<T: Send> Queue<T> { - /// Creates a new queue. The producer returned is connected to the consumer - /// to push all data to the consumer. - /// - /// # Arguments - /// - /// * `bound` - This queue implementation is implemented with a linked - /// list, and this means that a push is always a malloc. In - /// order to amortize this cost, an internal cache of nodes is - /// maintained to prevent a malloc from always being - /// necessary. This bound is the limit on the size of the - /// cache (if desired). If the value is 0, then the cache has - /// no bound. Otherwise, the cache will never grow larger than - /// `bound` (although the queue itself could be much larger. - pub fn new(bound: uint) -> Queue<T> { - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - Queue { - tail: Unsafe::new(n2), - tail_prev: AtomicPtr::new(n1), - head: Unsafe::new(n2), - first: Unsafe::new(n1), - tail_copy: Unsafe::new(n1), - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - } - } - - /// Pushes a new value onto this queue. Note that to use this function - /// safely, it must be externally guaranteed that there is only one pusher. - pub fn push(&self, t: T) { - unsafe { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (**self.head.get()).next.store(n, Release); - *self.head.get() = n; - } - } - - unsafe fn alloc(&self) -> *mut Node<T> { - // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If the above fails, then update our copy of the tail and try - // again. - *self.tail_copy.get() = self.tail_prev.load(Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Relaxed); - self.cache_subtractions.store(b + 1, Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Relaxed); - return ret; - } - // If all of that fails, then we have to allocate a new node - // (there's nothing in the node cache). - Node::new() - } - - /// Attempts to pop a value from this queue. Remember that to use this type - /// safely you must ensure that there is only one popper at a time. - pub fn pop(&self) -> Option<T> { - unsafe { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); - } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: Box<Node<T>> = mem::transmute(tail); - } - } - return ret; - } - } - - /// Attempts to peek at the head of the queue, returning `None` if the queue - /// has no data currently - pub fn peek<'a>(&'a self) -> Option<&'a mut T> { - // This is essentially the same as above with all the popping bits - // stripped out. - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - return (*next).value.as_mut(); - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.first.get(); - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _n: Box<Node<T>> = mem::transmute(cur); - cur = next; - } - } - } -} - -#[cfg(test)] -mod test { - use prelude::*; - - use alloc::arc::Arc; - use native; - - use super::Queue; - - #[test] - fn smoke() { - let q = Queue::new(0); - q.push(1); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - - #[test] - fn drop_full() { - let q = Queue::new(0); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn smoke_bound() { - let q = Queue::new(1); - q.push(1); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); - } - - #[test] - fn stress() { - stress_bound(0); - stress_bound(1); - - fn stress_bound(bound: uint) { - let a = Arc::new(Queue::new(bound)); - let b = a.clone(); - let (tx, rx) = channel(); - native::task::spawn(proc() { - for _ in range(0, 100000) { - loop { - match b.pop() { - Some(1) => break, - Some(_) => fail!(), - None => {} - } - } - } - tx.send(()); - }); - for _ in range(0, 100000) { - a.push(1); - } - rx.recv(); - } - } -} diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs new file mode 100644 index 00000000000..7667badf0e7 --- /dev/null +++ b/src/libstd/sync/task_pool.rs @@ -0,0 +1,98 @@ +// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +#![allow(missing_doc)] + +/// A task pool abstraction. Useful for achieving predictable CPU +/// parallelism. + +use core::prelude::*; + +use task; +use task::spawn; +use vec::Vec; +use comm::{channel, Sender}; + +enum Msg<T> { + Execute(proc(&T):Send), + Quit +} + +pub struct TaskPool<T> { + channels: Vec<Sender<Msg<T>>>, + next_index: uint, +} + +#[unsafe_destructor] +impl<T> Drop for TaskPool<T> { + fn drop(&mut self) { + for channel in self.channels.mut_iter() { + channel.send(Quit); + } + } +} + +impl<T> TaskPool<T> { + /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode` + /// is None, the tasks run on this scheduler; otherwise, they run on a + /// new scheduler with the given mode. The provided `init_fn_factory` + /// returns a function which, given the index of the task, should return + /// local data to be kept around in that task. + pub fn new(n_tasks: uint, + init_fn_factory: || -> proc(uint):Send -> T) + -> TaskPool<T> { + assert!(n_tasks >= 1); + + let channels = Vec::from_fn(n_tasks, |i| { + let (tx, rx) = channel::<Msg<T>>(); + let init_fn = init_fn_factory(); + + let task_body = proc() { + let local_data = init_fn(i); + loop { + match rx.recv() { + Execute(f) => f(&local_data), + Quit => break + } + } + }; + + // Run on this scheduler. + task::spawn(task_body); + + tx + }); + + return TaskPool { + channels: channels, + next_index: 0, + }; + } + + /// Executes the function `f` on a task in the pool. The function + /// receives a reference to the local data returned by the `init_fn`. + pub fn execute(&mut self, f: proc(&T):Send) { + self.channels.get(self.next_index).send(Execute(f)); + self.next_index += 1; + if self.next_index == self.channels.len() { self.next_index = 0; } + } +} + +#[test] +fn test_task_pool() { + let f: || -> proc(uint):Send -> uint = || { + let g: proc(uint):Send -> uint = proc(i) i; + g + }; + let mut pool = TaskPool::new(4, f); + for _ in range(0, 8) { + pool.execute(proc(i) println!("Hello from thread {}!", *i)); + } +} |
