diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-05 17:56:17 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 17:47:11 -0800 |
| commit | bfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch) | |
| tree | b10aeff181eff3a8654df495d2ad8826490f6533 | |
| parent | 000cda611f8224ac780fa37432f869f425cd2bb7 (diff) | |
| download | rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip | |
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
* Based on a single-producer single-consumer lock-free queue that doesn't
always have to allocate on every send.
* Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
* try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
* try_recv == never block
* recv_opt == block, don't fail
* iter() == Iterator<T> for Port<T>
* removed peek
* Type::new
* Removed rt::comm
| -rw-r--r-- | src/libstd/comm.rs | 311 | ||||
| -rw-r--r-- | src/libstd/comm/imp.rs | 337 | ||||
| -rw-r--r-- | src/libstd/comm/mod.rs | 1371 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 498 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/mpmc_bounded_queue.rs | 18 | ||||
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 230 | ||||
| -rw-r--r-- | src/libstd/rt/spsc_queue.rs | 296 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 1 |
9 files changed, 2631 insertions, 434 deletions
diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs deleted file mode 100644 index c5ed464de23..00000000000 --- a/src/libstd/comm.rs +++ /dev/null @@ -1,311 +0,0 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! -Message passing -*/ - -#[allow(missing_doc)]; - -use clone::Clone; -use iter::Iterator; -use kinds::Send; -use option::Option; -use rtcomm = rt::comm; - -/// A trait for things that can send multiple messages. -pub trait GenericChan<T> { - /// Sends a message. - fn send(&self, x: T); -} - -/// Things that can send multiple messages and can detect when the receiver -/// is closed -pub trait GenericSmartChan<T> { - /// Sends a message, or report if the receiver has closed the connection. - fn try_send(&self, x: T) -> bool; -} - -/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. -pub trait SendDeferred<T> { - fn send_deferred(&self, val: T); - fn try_send_deferred(&self, val: T) -> bool; -} - -/// A trait for things that can receive multiple messages. -pub trait GenericPort<T> { - /// Receives a message, or fails if the connection closes. - fn recv(&self) -> T; - - /// Receives a message, or returns `none` if - /// the connection is closed or closes. - fn try_recv(&self) -> Option<T>; - - /// Returns an iterator that breaks once the connection closes. - /// - /// # Example - /// - /// ~~~rust - /// do spawn { - /// for x in port.recv_iter() { - /// if pred(x) { break; } - /// println!("{}", x); - /// } - /// } - /// ~~~ - fn recv_iter<'a>(&'a self) -> RecvIterator<'a, Self> { - RecvIterator { port: self } - } -} - -pub struct RecvIterator<'a, P> { - priv port: &'a P, -} - -impl<'a, T, P: GenericPort<T>> Iterator<T> for RecvIterator<'a, P> { - fn next(&mut self) -> Option<T> { - self.port.try_recv() - } -} - -/// Ports that can `peek` -pub trait Peekable<T> { - /// Returns true if a message is available - fn peek(&self) -> bool; -} - -/* priv is disabled to allow users to get at traits like Select. */ -pub struct PortOne<T> { /* priv */ x: rtcomm::PortOne<T> } -pub struct ChanOne<T> { /* priv */ x: rtcomm::ChanOne<T> } - -pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) { - let (p, c) = rtcomm::oneshot(); - (PortOne { x: p }, ChanOne { x: c }) -} - -pub struct Port<T> { /* priv */ x: rtcomm::Port<T> } -pub struct Chan<T> { /* priv */ x: rtcomm::Chan<T> } - -pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { - let (p, c) = rtcomm::stream(); - (Port { x: p }, Chan { x: c }) -} - -impl<T: Send> ChanOne<T> { - pub fn send(self, val: T) { - let ChanOne { x: c } = self; - c.send(val) - } - - pub fn try_send(self, val: T) -> bool { - let ChanOne { x: c } = self; - c.try_send(val) - } - - pub fn send_deferred(self, val: T) { - let ChanOne { x: c } = self; - c.send_deferred(val) - } - - pub fn try_send_deferred(self, val: T) -> bool { - let ChanOne{ x: c } = self; - c.try_send_deferred(val) - } -} - -impl<T: Send> PortOne<T> { - pub fn recv(self) -> T { - let PortOne { x: p } = self; - p.recv() - } - - pub fn try_recv(self) -> Option<T> { - let PortOne { x: p } = self; - p.try_recv() - } -} - -impl<T: Send> Peekable<T> for PortOne<T> { - fn peek(&self) -> bool { - let &PortOne { x: ref p } = self; - p.peek() - } -} - -impl<T: Send> GenericChan<T> for Chan<T> { - fn send(&self, val: T) { - let &Chan { x: ref c } = self; - c.send(val) - } -} - -impl<T: Send> GenericSmartChan<T> for Chan<T> { - fn try_send(&self, val: T) -> bool { - let &Chan { x: ref c } = self; - c.try_send(val) - } -} - -impl<T: Send> SendDeferred<T> for Chan<T> { - fn send_deferred(&self, val: T) { - let &Chan { x: ref c } = self; - c.send_deferred(val) - } - - fn try_send_deferred(&self, val: T) -> bool { - let &Chan { x: ref c } = self; - c.try_send_deferred(val) - } -} - -impl<T: Send> GenericPort<T> for Port<T> { - fn recv(&self) -> T { - let &Port { x: ref p } = self; - p.recv() - } - - fn try_recv(&self) -> Option<T> { - let &Port { x: ref p } = self; - p.try_recv() - } -} - -impl<T: Send> Peekable<T> for Port<T> { - fn peek(&self) -> bool { - let &Port { x: ref p } = self; - p.peek() - } -} - - -pub struct SharedChan<T> { /* priv */ x: rtcomm::SharedChan<T> } - -impl<T: Send> SharedChan<T> { - pub fn new(c: Chan<T>) -> SharedChan<T> { - let Chan { x: c } = c; - SharedChan { x: rtcomm::SharedChan::new(c) } - } -} - -impl<T: Send> GenericChan<T> for SharedChan<T> { - fn send(&self, val: T) { - let &SharedChan { x: ref c } = self; - c.send(val) - } -} - -impl<T: Send> GenericSmartChan<T> for SharedChan<T> { - fn try_send(&self, val: T) -> bool { - let &SharedChan { x: ref c } = self; - c.try_send(val) - } -} - -impl<T: Send> SendDeferred<T> for SharedChan<T> { - fn send_deferred(&self, val: T) { - let &SharedChan { x: ref c } = self; - c.send_deferred(val) - } - - fn try_send_deferred(&self, val: T) -> bool { - let &SharedChan { x: ref c } = self; - c.try_send_deferred(val) - } -} - -impl<T: Send> Clone for SharedChan<T> { - fn clone(&self) -> SharedChan<T> { - let &SharedChan { x: ref c } = self; - SharedChan { x: c.clone() } - } -} - -pub struct SharedPort<T> { /* priv */ x: rtcomm::SharedPort<T> } - -impl<T: Send> SharedPort<T> { - pub fn new(p: Port<T>) -> SharedPort<T> { - let Port { x: p } = p; - SharedPort { x: rtcomm::SharedPort::new(p) } - } -} - -impl<T: Send> GenericPort<T> for SharedPort<T> { - fn recv(&self) -> T { - let &SharedPort { x: ref p } = self; - p.recv() - } - - fn try_recv(&self) -> Option<T> { - let &SharedPort { x: ref p } = self; - p.try_recv() - } -} - -impl<T: Send> Clone for SharedPort<T> { - fn clone(&self) -> SharedPort<T> { - let &SharedPort { x: ref p } = self; - SharedPort { x: p.clone() } - } -} - -#[cfg(test)] -mod tests { - use comm::*; - use prelude::*; - - #[test] - fn test_nested_recv_iter() { - let (port, chan) = stream::<int>(); - let (total_port, total_chan) = oneshot::<int>(); - - do spawn { - let mut acc = 0; - for x in port.recv_iter() { - acc += x; - for x in port.recv_iter() { - acc += x; - for x in port.try_recv().move_iter() { - acc += x; - total_chan.send(acc); - } - } - } - } - - chan.send(3); - chan.send(1); - chan.send(2); - assert_eq!(total_port.recv(), 6); - } - - #[test] - fn test_recv_iter_break() { - let (port, chan) = stream::<int>(); - let (count_port, count_chan) = oneshot::<int>(); - - do spawn { - let mut count = 0; - for x in port.recv_iter() { - if count >= 3 { - count_chan.send(count); - break; - } else { - count += x; - } - } - } - - chan.send(2); - chan.send(2); - chan.send(2); - chan.send(2); - assert_eq!(count_port.recv(), 4); - } -} diff --git a/src/libstd/comm/imp.rs b/src/libstd/comm/imp.rs new file mode 100644 index 00000000000..bd1d6fed901 --- /dev/null +++ b/src/libstd/comm/imp.rs @@ -0,0 +1,337 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! One of the major goals behind this channel implementation is to work +//! seamlessly on and off the runtime. This also means that the code isn't +//! littered with "if is_green() { ... } else { ... }". Right now, the rest of +//! the runtime isn't quite ready to for this abstraction to be done very +//! nicely, so the conditional "if green" blocks are all contained in this inner +//! module. +//! +//! The goal of this module is to mirror what the runtime "should be", not the +//! state that it is currently in today. You'll notice that there is no mention +//! of schedulers or is_green inside any of the channel code, it is currently +//! entirely contained in this one module. +//! +//! In the ideal world, nothing in this module exists and it is all implemented +//! elsewhere in the runtime (in the proper location). All of this code is +//! structured in order to easily refactor this to the correct location whenever +//! we have the trait objects in place to serve as the boundary of the +//! abstraction. + +use iter::{range, Iterator}; +use ops::Drop; +use option::{Some, None, Option}; +use rt::local::Local; +use rt::sched::{SchedHandle, Scheduler, TaskFromFriend}; +use rt::thread::Thread; +use rt; +use unstable::mutex::Mutex; +use unstable::sync::UnsafeArc; + +// A task handle is a method of waking up a blocked task. The handle itself +// is completely opaque and only has a wake() method defined on it. This +// method will wake the method regardless of the context of the thread which +// is currently calling wake(). +// +// This abstraction should be able to be created when putting a task to +// sleep. This should basically be a method on whatever the local Task is, +// consuming the local Task. + +pub struct TaskHandle { + priv inner: TaskRepr +} +enum TaskRepr { + Green(rt::BlockedTask, *mut SchedHandle), + Native(NativeWakeupStyle), +} +enum NativeWakeupStyle { + ArcWakeup(UnsafeArc<Mutex>), // shared mutex to synchronize on + LocalWakeup(*mut Mutex), // synchronize on the task-local mutex +} + +impl TaskHandle { + // Signal that this handle should be woken up. The `can_resched` + // argument indicates whether the current task could possibly be + // rescheduled or not. This does not have a lot of meaning for the + // native case, but for an M:N case it indicates whether a context + // switch can happen or not. + pub fn wake(self, can_resched: bool) { + match self.inner { + Green(task, handle) => { + // If we have a local scheduler, then use that to run the + // blocked task, otherwise we can use the handle to send the + // task back to its home. + if rt::in_green_task_context() { + if can_resched { + task.wake().map(Scheduler::run_task); + } else { + let mut s: ~Scheduler = Local::take(); + s.enqueue_blocked_task(task); + Local::put(s); + } + } else { + let task = match task.wake() { + Some(task) => task, None => return + }; + // XXX: this is not an easy section of code to refactor. + // If this handle is owned by the Task (which it + // should be), then this would be a use-after-free + // because once the task is pushed onto the message + // queue, the handle is gone. + // + // Currently the handle is instead owned by the + // Port/Chan pair, which means that because a + // channel is invoking this method the handle will + // continue to stay alive for the entire duration + // of this method. This will require thought when + // moving the handle into the task. + unsafe { (*handle).send(TaskFromFriend(task)) } + } + } + + // Note that there are no use-after-free races in this code. In + // the arc-case, we own the lock, and in the local case, we're + // using a lock so it's guranteed that they aren't running while + // we hold the lock. + Native(ArcWakeup(lock)) => { + unsafe { + let lock = lock.get(); + (*lock).lock(); + (*lock).signal(); + (*lock).unlock(); + } + } + Native(LocalWakeup(lock)) => { + unsafe { + (*lock).lock(); + (*lock).signal(); + (*lock).unlock(); + } + } + } + } + + // Trashes handle to this task. This ensures that necessary memory is + // deallocated, and there may be some extra assertions as well. + pub fn trash(self) { + match self.inner { + Green(task, _) => task.assert_already_awake(), + Native(..) => {} + } + } +} + +// This structure is an abstraction of what should be stored in the local +// task itself. This data is currently stored inside of each channel, but +// this should rather be stored in each task (and channels will still +// continue to lazily initialize this data). + +pub struct TaskData { + priv handle: Option<SchedHandle>, + priv lock: Mutex, +} + +impl TaskData { + pub fn new() -> TaskData { + TaskData { + handle: None, + lock: unsafe { Mutex::empty() }, + } + } +} + +impl Drop for TaskData { + fn drop(&mut self) { + unsafe { self.lock.destroy() } + } +} + +// Now this is the really fun part. This is where all the M:N/1:1-agnostic +// along with recv/select-agnostic blocking information goes. A "blocking +// context" is really just a stack-allocated structure (which is probably +// fine to be a stack-trait-object). +// +// This has some particularly strange interfaces, but the reason for all +// this is to support selection/recv/1:1/M:N all in one bundle. + +pub struct BlockingContext<'a> { + priv inner: BlockingRepr<'a> +} + +enum BlockingRepr<'a> { + GreenBlock(rt::BlockedTask, &'a mut Scheduler), + NativeBlock(Option<UnsafeArc<Mutex>>), +} + +impl<'a> BlockingContext<'a> { + // Creates one blocking context. The data provided should in theory be + // acquired from the local task, but it is instead acquired from the + // channel currently. + // + // This function will call `f` with a blocking context, plus the data + // that it is given. This function will then return whether this task + // should actually go to sleep or not. If `true` is returned, then this + // function does not return until someone calls `wake()` on the task. + // If `false` is returned, then this function immediately returns. + // + // # Safety note + // + // Note that this stack closure may not be run on the same stack as when + // this function was called. This means that the environment of this + // stack closure could be unsafely aliased. This is currently prevented + // through the guarantee that this function will never return before `f` + // finishes executing. + pub fn one(data: &mut TaskData, + f: |BlockingContext, &mut TaskData| -> bool) { + if rt::in_green_task_context() { + let sched: ~Scheduler = Local::take(); + sched.deschedule_running_task_and_then(|sched, task| { + let ctx = BlockingContext { inner: GreenBlock(task, sched) }; + // no need to do something on success/failure other than + // returning because the `block` function for a BlockingContext + // takes care of reawakening itself if the blocking procedure + // fails. If this function is successful, then we're already + // blocked, and if it fails, the task will already be + // rescheduled. + f(ctx, data); + }); + } else { + unsafe { data.lock.lock(); } + let ctx = BlockingContext { inner: NativeBlock(None) }; + if f(ctx, data) { + unsafe { data.lock.wait(); } + } + unsafe { data.lock.unlock(); } + } + } + + // Creates many blocking contexts. The intended use case for this + // function is selection over a number of ports. This will create `amt` + // blocking contexts, yielding them to `f` in turn. If `f` returns + // false, then this function aborts and returns immediately. If `f` + // repeatedly returns `true` `amt` times, then this function will block. + pub fn many(amt: uint, f: |BlockingContext| -> bool) { + if rt::in_green_task_context() { + let sched: ~Scheduler = Local::take(); + sched.deschedule_running_task_and_then(|sched, task| { + for handle in task.make_selectable(amt) { + let ctx = BlockingContext { + inner: GreenBlock(handle, sched) + }; + // see comment above in `one` for why no further action is + // necessary here + if !f(ctx) { break } + } + }); + } else { + // In the native case, our decision to block must be shared + // amongst all of the channels. It may be possible to + // stack-allocate this mutex (instead of putting it in an + // UnsafeArc box), but for now in order to prevent + // use-after-free trivially we place this into a box and then + // pass that around. + unsafe { + let mtx = UnsafeArc::new(Mutex::new()); + (*mtx.get()).lock(); + let success = range(0, amt).all(|_| { + f(BlockingContext { + inner: NativeBlock(Some(mtx.clone())) + }) + }); + if success { + (*mtx.get()).wait(); + } + (*mtx.get()).unlock(); + } + } + } + + // This function will consume this BlockingContext, and optionally block + // if according to the atomic `decision` function. The semantics of this + // functions are: + // + // * `slot` is required to be a `None`-slot (which is owned by the + // channel) + // * The `slot` will be filled in with a blocked version of the current + // task (with `wake`-ability if this function is successful). + // * If the `decision` function returns true, then this function + // immediately returns having relinquished ownership of the task. + // * If the `decision` function returns false, then the `slot` is reset + // to `None` and the task is re-scheduled if necessary (remember that + // the task will not resume executing before the outer `one` or + // `many` function has returned. This function is expected to have a + // release memory fence in order for the modifications of `to_wake` to be + // visible to other tasks. Code which attempts to read `to_wake` should + // have an acquiring memory fence to guarantee that this write is + // visible. + // + // This function will return whether the blocking occurred or not. + pub fn block(self, + data: &mut TaskData, + slot: &mut Option<TaskHandle>, + decision: || -> bool) -> bool { + assert!(slot.is_none()); + match self.inner { + GreenBlock(task, sched) => { + if data.handle.is_none() { + data.handle = Some(sched.make_handle()); + } + let handle = data.handle.get_mut_ref() as *mut SchedHandle; + *slot = Some(TaskHandle { inner: Green(task, handle) }); + + if !decision() { + match slot.take_unwrap().inner { + Green(task, _) => sched.enqueue_blocked_task(task), + Native(..) => unreachable!() + } + false + } else { + true + } + } + NativeBlock(shared) => { + *slot = Some(TaskHandle { + inner: Native(match shared { + Some(arc) => ArcWakeup(arc), + None => LocalWakeup(&mut data.lock as *mut Mutex), + }) + }); + + if !decision() { + *slot = None; + false + } else { + true + } + } + } + } +} + +// Agnostic method of forcing a yield of the current task +pub fn yield_now() { + if rt::in_green_task_context() { + let sched: ~Scheduler = Local::take(); + sched.yield_now(); + } else { + Thread::yield_now(); + } +} + +// Agnostic method of "maybe yielding" in order to provide fairness +pub fn maybe_yield() { + if rt::in_green_task_context() { + let sched: ~Scheduler = Local::take(); + sched.maybe_yield(); + } else { + // the OS decides fairness, nothing for us to do. + } +} diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs new file mode 100644 index 00000000000..9a65e9973cb --- /dev/null +++ b/src/libstd/comm/mod.rs @@ -0,0 +1,1371 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Rust Communication Primitives +//! +//! Rust makes it very difficult to share data among tasks to prevent race +//! conditions and to improve parallelism, but there is often a need for +//! communication between concurrent tasks. The primitives defined in this +//! module are the building blocks for synchronization in rust. +//! +//! This module currently provides three main types: +//! +//! * `Chan` +//! * `Port` +//! * `SharedChan` +//! +//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A +//! `SharedChan` is clone-able such that many tasks can send simultaneously to +//! one receiving port. These communication primitives are *task blocking*, not +//! *thread blocking*. This means that if one task is blocked on a channel, +//! other tasks can continue to make progress. +//! +//! Rust channels can be used as if they have an infinite internal buffer. What +//! this means is that the `send` operation will never block. `Port`s, on the +//! other hand, will block the task if there is no data to be received. +//! +//! ## Failure Propagation +//! +//! In addition to being a core primitive for communicating in rust, channels +//! and ports are the points at which failure is propagated among tasks. +//! Whenever the one half of channel is closed, the other half will have its +//! next operation `fail!`. The purpose of this is to allow propagation of +//! failure among tasks that are linked to one another via channels. +//! +//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform +//! their respective operations without failing, however. +//! +//! ## Outside the Runtime +//! +//! All channels and ports work seamlessly inside and outside of the rust +//! runtime. This means that code may use channels to communicate information +//! inside and outside of the runtime. For example, if rust were embedded as an +//! FFI module in another application, the rust runtime would probably be +//! running in its own external thread pool. Channels created can communicate +//! from the native application threads to the rust threads through the use of +//! native mutexes and condition variables. +//! +//! What this means is that if a native thread is using a channel, execution +//! will be blocked accordingly by blocking the OS thread. +//! +//! # Example +//! +//! ```rust +//! // Create a simple streaming channel +//! let (port, chan) = Chan::new(); +//! do spawn { +//! chan.send(10); +//! } +//! assert_eq!(port.recv(), 10); +//! +//! // Create a shared channel which can be sent along from many tasks +//! let (port, chan) = SharedChan::new(); +//! for i in range(0, 10) { +//! let chan = chan.clone(); +//! do spawn { +//! chan.send(i); +//! } +//! } +//! +//! for _ in range(0, 10) { +//! let j = port.recv(); +//! assert!(0 <= j && j < 10); +//! } +//! +//! // The call to recv() will fail!() because the channel has already hung +//! // up (or been deallocated) +//! let (port, chan) = Chan::new(); +//! drop(chan); +//! port.recv(); +//! ``` + +// A description of how Rust's channel implementation works +// +// Channels are supposed to be the basic building block for all other +// concurrent primitives that are used in Rust. As a result, the channel type +// needs to be highly optimized, flexible, and broad enough for use everywhere. +// +// The choice of implementation of all channels is to be built on lock-free data +// structures. The channels themselves are then consequently also lock-free data +// structures. As always with lock-free code, this is a very "here be dragons" +// territory, especially because I'm unaware of any academic papers which have +// gone into great length about channels of these flavors. +// +// ## Flavors of channels +// +// Rust channels come in two flavors: streams and shared channels. A stream has +// one sender and one receiver while a shared channel could have multiple +// senders. This choice heavily influences the design of the protocol set +// forth for both senders/receivers. +// +// ## Concurrent queues +// +// The basic idea of Rust's Chan/Port types is that send() never blocks, but +// recv() obviously blocks. This means that under the hood there must be some +// shared and concurrent queue holding all of the actual data. +// +// With two flavors of channels, two flavors of queues are also used. We have +// chosen to use queues from a well-known author which are abbreviated as SPSC +// and MPSC (single producer, single consumer and multiple producer, single +// consumer). SPSC queues are used for streams while MPSC queues are used for +// shared channels. +// +// ### SPSC optimizations +// +// The SPSC queue found online is essentially a linked list of nodes where one +// half of the nodes are the "queue of data" and the other half of nodes are a +// cache of unused nodes. The unused nodes are used such that an allocation is +// not required on every push() and a free doesn't need to happen on every +// pop(). +// +// As found online, however, the cache of nodes is of an infinite size. This +// means that if a channel at one point in its life had 50k items in the queue, +// then the queue will always have the capacity for 50k items. I believed that +// this was an unnecessary limitation of the implementation, so I have altered +// the queue to optionally have a bound on the cache size. +// +// By default, streams will have an unbounded SPSC queue with a small-ish cache +// size. The hope is that the cache is still large enough to have very fast +// send() operations while not too large such that millions of channels can +// coexist at once. +// +// ### MPSC optimizations +// +// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses +// a linked list under the hood to earn its unboundedness, but I have not put +// forth much effort into having a cache of nodes similar to the SPSC queue. +// +// For now, I believe that this is "ok" because shared channels are not the most +// common type, but soon we may wish to revisit this queue choice and determine +// another candidate for backend storage of shared channels. +// +// ## Overview of the Implementation +// +// Now that there's a little background on the concurrent queues used, it's +// worth going into much more detail about the channels themselves. The basic +// pseudocode for a send/recv are: +// +// +// send(t) recv() +// queue.push(t) return if queue.pop() +// if increment() == -1 deschedule { +// wakeup() if decrement() > 0 +// cancel_deschedule() +// } +// queue.pop() +// +// As mentioned before, there are no locks in this implementation, only atomic +// instructions are used. +// +// ### The internal atomic counter +// +// Every channel/port/shared channel have a shared counter with their +// counterparts to keep track of the size of the queue. This counter is used to +// abort descheduling by the receiver and to know when to wake up on the sending +// side. +// +// As seen in the pseudocode, senders will increment this count and receivers +// will decrement the count. The theory behind this is that if a sender sees a +// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count, +// then it doesn't need to block. +// +// The recv() method has a beginning call to pop(), and if successful, it needs +// to decrement the count. It is a crucial implementation detail that this +// decrement does *not* happen to the shared counter. If this were the case, +// then it would be possible for the counter to be very negative when there were +// no receivers waiting, in which case the senders would have to determine when +// it was actually appropriate to wake up a receiver. +// +// Instead, the "steal count" is kept track of separately (not atomically +// because it's only used by ports), and then the decrement() call when +// descheduling will lump in all of the recent steals into one large decrement. +// +// The implication of this is that if a sender sees a -1 count, then there's +// guaranteed to be a waiter waiting! +// +// ## Native Implementation +// +// A major goal of these channels is to work seamlessly on and off the runtime. +// All of the previous race conditions have been worded in terms of +// scheduler-isms (which is obviously not available without the runtime). +// +// For now, native usage of channels (off the runtime) will fall back onto +// mutexes/cond vars for descheduling/atomic decisions. The no-contention path +// is still entirely lock-free, the "deschedule" blocks above are surrounded by +// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a +// condition variable. +// +// ## Select +// +// Being able to support selection over channels has greatly influenced this +// design, and not only does selection need to work inside the runtime, but also +// outside the runtime. +// +// The implementation is fairly straightforward. The goal of select() is not to +// return some data, but only to return which channel can receive data without +// blocking. The implementation is essentially the entire blocking procedure +// followed by an increment as soon as its woken up. The cancellation procedure +// involves an increment and swapping out of to_wake to acquire ownership of the +// task to unblock. +// +// Sadly this current implementation requires multiple allocations, so I have +// seen the throughput of select() be much worse than it should be. I do not +// believe that there is anything fundamental which needs to change about these +// channels, however, in order to support a more efficient select(). +// +// # Conclusion +// +// And now that you've seen all the races that I found and attempted to fix, +// here's the code for you to find some more! + +use cast; +use clone::Clone; +use container::Container; +use int; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Option, Some, None}; +use rt::thread::Thread; +use unstable::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; +use vec::{ImmutableVector, OwnedVector}; + +use spsc = rt::spsc_queue; +use mpsc = rt::mpsc_queue; + +use self::imp::{TaskHandle, TaskData, BlockingContext}; +pub use self::select::Select; + +macro_rules! test ( + { fn $name:ident() $b:block $($a:attr)*} => ( + mod $name { + #[allow(unused_imports)]; + + use util; + use super::super::*; + use prelude::*; + + fn f() $b + + $($a)* #[test] fn uv() { f() } + $($a)* #[test] fn native() { + use unstable::run_in_bare_thread; + run_in_bare_thread(f); + } + } + ) +) + +mod imp; +mod select; + +/////////////////////////////////////////////////////////////////////////////// +// Helper type to abstract ports for channels and shared channels +/////////////////////////////////////////////////////////////////////////////// + +enum Consumer<T> { + SPSC(spsc::Consumer<T, Packet>), + MPSC(mpsc::Consumer<T, Packet>), +} + +impl<T: Send> Consumer<T>{ + unsafe fn packet(&self) -> *mut Packet { + match *self { + SPSC(ref c) => c.packet(), + MPSC(ref c) => c.packet(), + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Public structs +/////////////////////////////////////////////////////////////////////////////// + +/// The receiving-half of Rust's channel type. This half can only be owned by +/// one task +pub struct Port<T> { + priv queue: Consumer<T>, +} + +/// An iterator over messages received on a port, this iterator will block +/// whenever `next` is called, waiting for a new message, and `None` will be +/// returned when the corresponding channel has hung up. +pub struct PortIterator<'a, T> { + priv port: &'a Port<T> +} + +/// The sending-half of Rust's channel type. This half can only be owned by one +/// task +pub struct Chan<T> { + priv queue: spsc::Producer<T, Packet>, +} + +/// The sending-half of Rust's channel type. This half can be shared among many +/// tasks by creating copies of itself through the `clone` method. +pub struct SharedChan<T> { + priv queue: mpsc::Producer<T, Packet>, +} + +/////////////////////////////////////////////////////////////////////////////// +// Internal struct definitions +/////////////////////////////////////////////////////////////////////////////// + +struct Packet { + cnt: AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: Option<TaskHandle>, // Task to wake up + + data: TaskData, + + // This lock is used to wake up native threads blocked in select. The + // `lock` field is not used because the thread blocking in select must + // block on only one mutex. + //selection_lock: Option<UnsafeArc<Mutex>>, + + // The number of channels which are currently using this packet. This is + // used to reference count shared channels. + channels: AtomicInt, + + selecting: AtomicBool, + selection_id: uint, + select_next: *mut Packet, + select_prev: *mut Packet, +} + +/////////////////////////////////////////////////////////////////////////////// +// All implementations -- the fun part +/////////////////////////////////////////////////////////////////////////////// + +static DISCONNECTED: int = int::min_value; +static RESCHED_FREQ: int = 200; + +impl Packet { + fn new() -> Packet { + Packet { + cnt: AtomicInt::new(0), + steals: 0, + to_wake: None, + data: TaskData::new(), + channels: AtomicInt::new(1), + + selecting: AtomicBool::new(false), + selection_id: 0, + select_next: 0 as *mut Packet, + select_prev: 0 as *mut Packet, + } + } + + // Increments the channel size count, preserving the disconnected state if + // the other end has disconnected. + fn increment(&mut self) -> int { + match self.cnt.fetch_add(1, SeqCst) { + DISCONNECTED => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is "ok". + self.cnt.store(DISCONNECTED, SeqCst); + DISCONNECTED + } + n => n + } + } + + // Decrements the reference count of the channel, returning whether the task + // should block or not. This assumes that the task is ready to sleep in that + // the `to_wake` field has already been filled in. Once this decrement + // happens, the task could wake up on the other end. + // + // From an implementation perspective, this is also when our "steal count" + // gets merged into the "channel count". Our steal count is reset to 0 after + // this function completes. + // + // As with increment(), this preserves the disconnected state if the + // channel is disconnected. + fn decrement(&mut self) -> bool { + let steals = self.steals; + self.steals = 0; + match self.cnt.fetch_sub(1 + steals, SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, SeqCst); + false + } + n => { + assert!(n >= 0); + n - steals <= 0 + } + } + } + + // Helper function for select, tests whether this port can receive without + // blocking (obviously not an atomic decision). + fn can_recv(&self) -> bool { + let cnt = self.cnt.load(SeqCst); + cnt == DISCONNECTED || cnt - self.steals > 0 + } + + // This function must have had at least an acquire fence before it to be + // properly called. + fn wakeup(&mut self, can_resched: bool) { + self.to_wake.take_unwrap().wake(can_resched); + self.selecting.store(false, Relaxed); + } + + // Aborts the selection process for a port. This happens as part of select() + // once the task has reawoken. This will place the channel back into a + // consistent state which is ready to be received from again. + // + // The method of doing this is a little subtle. These channels have the + // invariant that if -1 is seen, then to_wake is always Some(..) and should + // be woken up. This aborting process at least needs to add 1 to the + // reference count, but that is not guaranteed to make the count positive + // (our steal count subtraction could mean that after the addition the + // channel count is still negative). + // + // In order to get around this, we force our channel count to go above 0 by + // adding a large number >= 1 to it. This way no sender will see -1 unless + // we are indeed blocking. This "extra lump" we took out of the channel + // becomes our steal count (which will get re-factored into the count on the + // next blocking recv) + // + // The return value of this method is whether there is data on this channel + // to receive or not. + fn abort_selection(&mut self, take_to_wake: bool) -> bool { + // make sure steals + 1 makes the count go non-negative + let steals = { + let cnt = self.cnt.load(SeqCst); + if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + }; + let prev = self.cnt.fetch_add(steals + 1, SeqCst); + + // If we were previously disconnected, then we know for sure that there + // is no task in to_wake, so just keep going + if prev == DISCONNECTED { + assert!(self.to_wake.is_none()); + self.cnt.store(DISCONNECTED, SeqCst); + self.selecting.store(false, SeqCst); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + if prev < 0 { + if take_to_wake { + self.to_wake.take_unwrap().trash(); + } else { + assert!(self.to_wake.is_none()); + } + + // We woke ourselves up, we're responsible for cancelling + assert!(self.selecting.load(Relaxed)); + self.selecting.store(false, Relaxed); + } + assert_eq!(self.steals, 0); + self.steals = steals; + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + } + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + unsafe fn drop_chan(&mut self) { + match self.channels.fetch_sub(1, SeqCst) { + 1 => { + match self.cnt.swap(DISCONNECTED, SeqCst) { + -1 => { self.wakeup(false); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + n if n > 1 => {}, + n => fail!("bad number of channels left {}", n), + } + } +} + +impl Drop for Packet { + fn drop(&mut self) { + unsafe { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(SeqCst), DISCONNECTED); + assert!(self.to_wake.is_none()); + assert_eq!(self.channels.load(SeqCst), 0); + } + } +} + +impl<T: Send> Chan<T> { + /// Creates a new port/channel pair. All data send on the channel returned + /// will become available on the port as well. See the documentation of + /// `Port` and `Chan` to see what's possible with them. + pub fn new() -> (Port<T>, Chan<T>) { + // arbitrary 128 size cache -- this is just a max cache size, not a + // maximum buffer size + let (c, p) = spsc::queue(128, Packet::new()); + let c = SPSC(c); + (Port { queue: c }, Chan { queue: p }) + } + + /// Sends a value along this channel to be received by the corresponding + /// port. + /// + /// Rust channels are infinitely buffered so this method will never block. + /// This method may trigger a rescheduling, however, in order to wake up a + /// blocked receiver (if one is present). If no scheduling is desired, then + /// the `send_deferred` guarantees that there will be no reschedulings. + /// + /// # Failure + /// + /// This function will fail if the other end of the channel has hung up. + /// This means that if the corresponding port has fallen out of scope, this + /// function will trigger a fail message saying that a message is being sent + /// on a closed channel. + /// + /// Note that if this function does *not* fail, it does not mean that the + /// data will be successfully received. All sends are placed into a queue, + /// so it is possible for a send to succeed (the other end is alive), but + /// then the other end could immediately disconnect. + /// + /// The purpose of this functionality is to propagate failure among tasks. + /// If failure is not desired, then consider using the `try_send` method + pub fn send(&self, t: T) { + if !self.try_send(t) { + fail!("sending on a closed channel"); + } + } + + /// This function is equivalent in the semantics of `send`, but it + /// guarantees that a rescheduling will never occur when this method is + /// called. + pub fn send_deferred(&self, t: T) { + if !self.try_send_deferred(t) { + fail!("sending on a closed channel"); + } + } + + /// Attempts to send a value on this channel, returning whether it was + /// successfully sent. + /// + /// A successful send occurs when it is determined that the other end of the + /// channel has not hung up already. An unsuccessful send would be one where + /// the corresponding port has already been deallocated. Note that a return + /// value of `false` means that the data will never be received, but a + /// return value of `true` does *not* mean that the data will be received. + /// It is possible for the corresponding port to hang up immediately after + /// this function returns `true`. + /// + /// Like `send`, this method will never block. If the failure of send cannot + /// be tolerated, then this method should be used instead. + pub fn try_send(&self, t: T) -> bool { self.try(t, true) } + + /// This function is equivalent in the semantics of `try_send`, but it + /// guarantees that a rescheduling will never occur when this method is + /// called. + pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } + + fn try(&self, t: T, can_resched: bool) -> bool { + unsafe { + let this = cast::transmute_mut(self); + this.queue.push(t); + let packet = this.queue.packet(); + match (*packet).increment() { + // As described above, -1 == wakeup + -1 => { (*packet).wakeup(can_resched); true } + // Also as above, SPSC queues must be >= -2 + -2 => true, + // We succeeded if we sent data + DISCONNECTED => this.queue.is_empty(), + // In order to prevent starvation of other tasks in situations + // where a task sends repeatedly without ever receiving, we + // occassionally yield instead of doing a send immediately. + // Only doing this if we're doing a rescheduling send, otherwise + // the caller is expecting not to context switch. + // + // Note that we don't unconditionally attempt to yield because + // the TLS overhead can be a bit much. + n => { + assert!(n >= 0); + if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + imp::maybe_yield(); + } + true + } + } + } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Chan<T> { + fn drop(&mut self) { + unsafe { (*self.queue.packet()).drop_chan(); } + } +} + +impl<T: Send> SharedChan<T> { + /// Creates a new shared channel and port pair. The purpose of a shared + /// channel is to be cloneable such that many tasks can send data at the + /// same time. All data sent on any channel will become available on the + /// provided port as well. + pub fn new() -> (Port<T>, SharedChan<T>) { + let (c, p) = mpsc::queue(Packet::new()); + let c = MPSC(c); + (Port { queue: c }, SharedChan { queue: p }) + } + + /// Equivalent method to `send` on the `Chan` type (using the same + /// semantics) + pub fn send(&self, t: T) { + if !self.try_send(t) { + fail!("sending on a closed channel"); + } + } + + /// This function is equivalent in the semantics of `send`, but it + /// guarantees that a rescheduling will never occur when this method is + /// called. + pub fn send_deferred(&self, t: T) { + if !self.try_send_deferred(t) { + fail!("sending on a closed channel"); + } + } + + /// Equivalent method to `try_send` on the `Chan` type (using the same + /// semantics) + pub fn try_send(&self, t: T) -> bool { self.try(t, true) } + + /// This function is equivalent in the semantics of `try_send`, but it + /// guarantees that a rescheduling will never occur when this method is + /// called. + pub fn try_send_deferred(&self, t: T) -> bool { self.try(t, false) } + + fn try(&self, t: T, can_resched: bool) -> bool { + unsafe { + // Note that the multiple sender case is a little tricker + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + let packet = self.queue.packet(); + if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { + return false + } + + let this = cast::transmute_mut(self); + this.queue.push(t); + + match (*packet).increment() { + DISCONNECTED => {} // oh well, we tried + -1 => { (*packet).wakeup(can_resched); } + n => { + if can_resched && n > 0 && n % RESCHED_FREQ == 0 { + imp::maybe_yield(); + } + } + } + true + } + } +} + +impl<T: Send> Clone for SharedChan<T> { + fn clone(&self) -> SharedChan<T> { + unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } + SharedChan { queue: self.queue.clone() } + } +} + +#[unsafe_destructor] +impl<T: Send> Drop for SharedChan<T> { + fn drop(&mut self) { + unsafe { (*self.queue.packet()).drop_chan(); } + } +} + +impl<T: Send> Port<T> { + /// Blocks waiting for a value on this port + /// + /// This function will block if necessary to wait for a corresponding send + /// on the channel from its paired `Chan` structure. This port will be woken + /// up when data is ready, and the data will be returned. + /// + /// # Failure + /// + /// Similar to channels, this method will trigger a task failure if the + /// other end of the channel has hung up (been deallocated). The purpose of + /// this is to propagate failure among tasks. + /// + /// If failure is not desired, then there are two options: + /// + /// * If blocking is still desired, the `recv_opt` method will return `None` + /// when the other end hangs up + /// + /// * If blocking is not desired, then the `try_recv` method will attempt to + /// peek at a value on this port. + pub fn recv(&self) -> T { + match self.recv_opt() { + Some(t) => t, + None => fail!("receiving on a closed channel"), + } + } + + /// Attempts to return a pending value on this port without blocking + /// + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a port. + /// + /// This function cannot fail. + pub fn try_recv(&self) -> Option<T> { + self.try_recv_inc(true) + } + + fn try_recv_inc(&self, increment: bool) -> Option<T> { + // This is a "best effort" situation, so if a queue is inconsistent just + // don't worry about it. + let this = unsafe { cast::transmute_mut(self) }; + let ret = match this.queue { + SPSC(ref mut queue) => queue.pop(), + MPSC(ref mut queue) => match queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is + // reported as having data available, but our pop() has + // failed due to the queue being in an inconsistent state. + // This means that there is some pusher somewhere which has + // yet to complete, but we are guaranteed that a pop will + // eventually succeed. In this case, we spin in a yield loop + // because the remote sender should finish their enqueue + // operation "very quickly". + // + // Note that this yield loop does *not* attempt to do a green + // yield (regardless of the context), but *always* performs an + // OS-thread yield. The reasoning for this is that the pusher in + // question which is causing the inconsistent state is + // guaranteed to *not* be a blocked task (green tasks can't get + // pre-empted), so it must be on a different OS thread. Also, + // `try_recv` is normally a "guaranteed no rescheduling" context + // in a green-thread situation. By yielding control of the + // thread, we will hopefully allow time for the remote task on + // the other OS thread to make progress. + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M + // pushes have succeeded, at least M pops will succeed. The + // current queues guarantee that if there are N active + // pushes, you can pop N times once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + Thread::yield_now(); + match queue.pop() { + mpsc::Data(t) => { data = t; break } + mpsc::Empty => fail!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + } + }; + if increment && ret.is_some() { + unsafe { (*this.queue.packet()).steals += 1; } + } + return ret; + } + + /// Attempt to wait for a value on this port, but does not fail if the + /// corresponding channel has hung up. + /// + /// This implementation of iterators for ports will always block if there is + /// not data available on the port, but it will not fail in the case that + /// the channel has been deallocated. + /// + /// In other words, this function has the same semantics as the `recv` + /// method except for the failure aspect. + /// + /// If the channel has hung up, then `None` is returned. Otherwise `Some` of + /// the value found on the port is returned. + pub fn recv_opt(&self) -> Option<T> { + // optimistic preflight check (scheduling is expensive) + match self.try_recv() { None => {}, data => return data } + + let packet; + let this; + unsafe { + this = cast::transmute_mut(self); + packet = this.queue.packet(); + BlockingContext::one(&mut (*packet).data, |ctx, data| { + ctx.block(data, &mut (*packet).to_wake, || (*packet).decrement()) + }); + } + + let data = self.try_recv_inc(false); + if data.is_none() && + unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED { + fail!("bug: woke up too soon"); + } + return data; + } + + /// Returns an iterator which will block waiting for messages, but never + /// `fail!`. It will return `None` when the channel has hung up. + pub fn iter<'a>(&'a self) -> PortIterator<'a, T> { + PortIterator { port: self } + } +} + +impl<'a, T: Send> Iterator<T> for PortIterator<'a, T> { + fn next(&mut self) -> Option<T> { self.port.recv_opt() } +} + +#[unsafe_destructor] +impl<T: Send> Drop for Port<T> { + fn drop(&mut self) { + // All we need to do is store that we're disconnected. If the channel + // half has already disconnected, then we'll just deallocate everything + // when the shared packet is deallocated. + unsafe { + (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + + use task; + use rt::thread::Thread; + use super::*; + use rt::test::*; + + test!(fn smoke() { + let (p, c) = Chan::new(); + c.send(1); + assert_eq!(p.recv(), 1); + }) + + test!(fn drop_full() { + let (_p, c) = Chan::new(); + c.send(~1); + }) + + test!(fn drop_full_shared() { + let (_p, c) = SharedChan::new(); + c.send(~1); + }) + + test!(fn smoke_shared() { + let (p, c) = SharedChan::new(); + c.send(1); + assert_eq!(p.recv(), 1); + let c = c.clone(); + c.send(1); + assert_eq!(p.recv(), 1); + }) + + #[test] + fn smoke_threads() { + let (p, c) = Chan::new(); + do task::spawn_sched(task::SingleThreaded) { + c.send(1); + } + assert_eq!(p.recv(), 1); + } + + #[test] #[should_fail] + fn smoke_port_gone() { + let (p, c) = Chan::new(); + drop(p); + c.send(1); + } + + #[test] #[should_fail] + fn smoke_shared_port_gone() { + let (p, c) = SharedChan::new(); + drop(p); + c.send(1); + } + + #[test] #[should_fail] + fn smoke_shared_port_gone2() { + let (p, c) = SharedChan::new(); + drop(p); + let c2 = c.clone(); + drop(c); + c2.send(1); + } + + #[test] #[should_fail] + fn port_gone_concurrent() { + let (p, c) = Chan::new(); + do task::spawn_sched(task::SingleThreaded) { + p.recv(); + } + loop { c.send(1) } + } + + #[test] #[should_fail] + fn port_gone_concurrent_shared() { + let (p, c) = SharedChan::new(); + let c1 = c.clone(); + do task::spawn_sched(task::SingleThreaded) { + p.recv(); + } + loop { + c.send(1); + c1.send(1); + } + } + + #[test] #[should_fail] + fn smoke_chan_gone() { + let (p, c) = Chan::<int>::new(); + drop(c); + p.recv(); + } + + #[test] #[should_fail] + fn smoke_chan_gone_shared() { + let (p, c) = SharedChan::<()>::new(); + let c2 = c.clone(); + drop(c); + drop(c2); + p.recv(); + } + + #[test] #[should_fail] + fn chan_gone_concurrent() { + let (p, c) = Chan::new(); + do task::spawn_sched(task::SingleThreaded) { + c.send(1); + c.send(1); + } + loop { p.recv(); } + } + + #[test] + fn stress() { + let (p, c) = Chan::new(); + do task::spawn_sched(task::SingleThreaded) { + for _ in range(0, 10000) { c.send(1); } + } + for _ in range(0, 10000) { + assert_eq!(p.recv(), 1); + } + } + + #[test] + fn stress_shared() { + static AMT: uint = 10000; + static NTHREADS: uint = 8; + let (p, c) = SharedChan::<int>::new(); + let (p1, c1) = Chan::new(); + + do spawn { + for _ in range(0, AMT * NTHREADS) { + assert_eq!(p.recv(), 1); + } + assert_eq!(p.try_recv(), None); + c1.send(()); + } + + for _ in range(0, NTHREADS) { + let c = c.clone(); + do task::spawn_sched(task::SingleThreaded) { + for _ in range(0, AMT) { c.send(1); } + } + } + p1.recv(); + + } + + #[test] + fn send_from_outside_runtime() { + let (p, c) = Chan::<int>::new(); + let (p1, c1) = Chan::new(); + do spawn { + c1.send(()); + for _ in range(0, 40) { + assert_eq!(p.recv(), 1); + } + } + p1.recv(); + let t = do Thread::start { + for _ in range(0, 40) { + c.send(1); + } + }; + t.join(); + } + + #[test] + fn recv_from_outside_runtime() { + let (p, c) = Chan::<int>::new(); + let t = do Thread::start { + for _ in range(0, 40) { + assert_eq!(p.recv(), 1); + } + }; + for _ in range(0, 40) { + c.send(1); + } + t.join(); + } + + #[test] + fn no_runtime() { + let (p1, c1) = Chan::<int>::new(); + let (p2, c2) = Chan::<int>::new(); + let t1 = do Thread::start { + assert_eq!(p1.recv(), 1); + c2.send(2); + }; + let t2 = do Thread::start { + c1.send(1); + assert_eq!(p2.recv(), 2); + }; + t1.join(); + t2.join(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + do run_in_newsched_task { + let (port, _chan) = Chan::<int>::new(); + { let _p = port; } + } + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + do run_in_newsched_task { + let (_port, chan) = Chan::<int>::new(); + { let _c = chan; } + } + } + + #[test] #[should_fail] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (port, chan) = Chan::<~int>::new(); + { let _p = port; } + chan.send(~0); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will fail + do run_in_newsched_task { + let res = do spawntask_try { + let (port, chan) = Chan::<~int>::new(); + { let _c = chan; } + port.recv(); + }; + // What is our res? + assert!(res.is_err()); + } + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + do run_in_newsched_task { + let (port, chan) = Chan::<~int>::new(); + chan.send(~10); + assert!(port.recv() == ~10); + } + } + + #[test] + fn oneshot_single_thread_try_send_open() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + assert!(chan.try_send(10)); + assert!(port.recv() == 10); + } + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + { let _p = port; } + assert!(!chan.try_send(10)); + } + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + chan.send(10); + assert!(port.try_recv() == Some(10)); + } + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + { let _c = chan; } + assert!(port.recv_opt() == None); + } + } + + #[test] + fn oneshot_single_thread_peek_data() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + assert!(port.try_recv().is_none()); + chan.send(10); + assert!(port.try_recv().is_some()); + } + } + + #[test] + fn oneshot_single_thread_peek_close() { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + { let _c = chan; } + assert!(port.try_recv().is_none()); + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn oneshot_single_thread_peek_open() { + do run_in_newsched_task { + let (port, _) = Chan::<int>::new(); + assert!(port.try_recv().is_none()); + } + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + do run_in_newsched_task { + let (port, chan) = Chan::<~int>::new(); + do spawntask { + assert!(port.recv() == ~10); + } + + chan.send(~10); + } + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + do run_in_newsched_task { + let (port, chan) = Chan::<~int>::new(); + do spawntask_later { + let _chan = chan; + } + let res = do spawntask_try { + assert!(port.recv() == ~10); + }; + assert!(res.is_err()); + } + } + + #[test] + fn oneshot_multi_thread_close_stress() { + stress_factor().times(|| { + do run_in_newsched_task { + let (port, chan) = Chan::<int>::new(); + let thread = do spawntask_thread { + let _p = port; + }; + let _chan = chan; + thread.join(); + } + }) + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + stress_factor().times(|| { + let (port, chan) = Chan::<int>::new(); + do spawn { + let _p = port; + } + do task::try { + chan.send(1); + }; + }) + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + stress_factor().times(|| { + let (port, chan) = Chan::<int>::new(); + do spawn { + let port = port; + let res = do task::try { + port.recv(); + }; + assert!(res.is_err()); + }; + do spawn { + let chan = chan; + do spawn { + let _chan = chan; + } + }; + }) + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + stress_factor().times(|| { + let (port, chan) = Chan::<~int>::new(); + do spawn { + chan.send(~10); + } + do spawn { + assert!(port.recv() == ~10); + } + }) + } + + #[test] + fn stream_send_recv_stress() { + stress_factor().times(|| { + let (port, chan) = Chan::<~int>::new(); + + send(chan, 0); + recv(port, 0); + + fn send(chan: Chan<~int>, i: int) { + if i == 10 { return } + + do spawntask_random { + chan.send(~i); + send(chan, i + 1); + } + } + + fn recv(port: Port<~int>, i: int) { + if i == 10 { return } + + do spawntask_random { + assert!(port.recv() == ~i); + recv(port, i + 1); + }; + } + }) + } + + #[test] + fn recv_a_lot() { + // Regression test that we don't run out of stack in scheduler context + do run_in_newsched_task { + let (port, chan) = Chan::new(); + 10000.times(|| { chan.send(()) }); + 10000.times(|| { port.recv() }); + } + } + + #[test] + fn shared_chan_stress() { + do run_in_mt_newsched_task { + let (port, chan) = SharedChan::new(); + let total = stress_factor() + 100; + total.times(|| { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + }); + + total.times(|| { + port.recv(); + }); + } + } + + #[test] + fn test_nested_recv_iter() { + let (port, chan) = Chan::<int>::new(); + let (total_port, total_chan) = Chan::<int>::new(); + + do spawn { + let mut acc = 0; + for x in port.iter() { + acc += x; + } + total_chan.send(acc); + } + + chan.send(3); + chan.send(1); + chan.send(2); + drop(chan); + assert_eq!(total_port.recv(), 6); + } + + #[test] + fn test_recv_iter_break() { + let (port, chan) = Chan::<int>::new(); + let (count_port, count_chan) = Chan::<int>::new(); + + do spawn { + let mut count = 0; + for x in port.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_chan.send(count); + } + + chan.send(2); + chan.send(2); + chan.send(2); + chan.try_send(2); + drop(chan); + assert_eq!(count_port.recv(), 4); + } +} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs new file mode 100644 index 00000000000..81a77000bad --- /dev/null +++ b/src/libstd/comm/select.rs @@ -0,0 +1,498 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Selection over an array of ports +//! +//! This module contains the implementation machinery necessary for selecting +//! over a number of ports. One large goal of this module is to provide an +//! efficient interface to selecting over any port of any type. +//! +//! This is achieved through an architecture of a "port set" in which ports are +//! added to a set and then the entire set is waited on at once. The set can be +//! waited on multiple times to prevent re-adding each port to the set. +//! +//! Usage of this module is currently encouraged to go through the use of the +//! `select!` macro. This macro allows naturally binding of variables to the +//! received values of ports in a much more natural syntax then usage of the +//! `Select` structure directly. +//! +//! # Example +//! +//! ```rust +//! let (mut p1, c1) = Chan::new(); +//! let (mut p2, c2) = Chan::new(); +//! +//! c1.send(1); +//! c2.send(2); +//! +//! select! ( +//! val = p1.recv() => { +//! assert_eq!(val, 1); +//! } +//! val = p2.recv() => { +//! assert_eq!(val, 2); +//! } +//! ) + +use cast; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use ptr::RawPtr; +use super::imp::BlockingContext; +use super::{Packet, Port, imp}; +use uint; +use unstable::atomics::{Relaxed, SeqCst}; + +macro_rules! select { + ( + $name1:pat = $port1:ident.$meth1:ident() => $code1:expr, + $($name:pat = $port:ident.$meth:ident() => $code:expr),* + ) => ({ + use std::comm::Select; + let sel = Select::new(); + let mut $port1 = sel.add(&mut $port1); + $( let mut $port = sel.add(&mut $port); )* + let ret = sel.wait(); + if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 } + $( else if ret == $port.id { let $name = $port.$meth(); $code } )* + else { unreachable!() } + }) +} + +/// The "port set" of the select interface. This structure is used to manage a +/// set of ports which are being selected over. +#[no_freeze] +#[no_send] +pub struct Select { + priv head: *mut Packet, + priv tail: *mut Packet, + priv next_id: uint, +} + +/// A handle to a port which is currently a member of a `Select` set of ports. +/// This handle is used to keep the port in the set as well as interact with the +/// underlying port. +pub struct Handle<'self, T> { + id: uint, + priv selector: &'self Select, + priv port: &'self mut Port<T>, +} + +struct PacketIterator { priv cur: *mut Packet } + +impl Select { + /// Creates a new selection structure. This set is initially empty and + /// `wait` will fail!() if called. + /// + /// Usage of this struct directly can sometimes be burdensome, and usage is + /// rather much easier through the `select!` macro. + pub fn new() -> Select { + Select { + head: 0 as *mut Packet, + tail: 0 as *mut Packet, + next_id: 1, + } + } + + /// Adds a new port to this set, returning a handle which is then used to + /// receive on the port. + /// + /// Note that this port parameter takes `&mut Port` instead of `&Port`. None + /// of the methods of receiving on a port require `&mut self`, but `&mut` is + /// used here in order to have the compiler guarantee that the same port is + /// not added to this set more than once. + /// + /// When the returned handle falls out of scope, the port will be removed + /// from this set. While the handle is in this set, usage of the port can be + /// done through the `Handle`'s receiving methods. + pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> { + let this = unsafe { cast::transmute_mut(self) }; + let id = this.next_id; + this.next_id += 1; + unsafe { + let packet = port.queue.packet(); + assert!(!(*packet).selecting.load(Relaxed)); + assert_eq!((*packet).selection_id, 0); + (*packet).selection_id = id; + if this.head.is_null() { + this.head = packet; + this.tail = packet; + } else { + (*packet).select_prev = this.tail; + assert!((*packet).select_next.is_null()); + (*this.tail).select_next = packet; + this.tail = packet; + } + } + Handle { id: id, selector: this, port: port } + } + + /// Waits for an event on this port set. The returned valus is *not* and + /// index, but rather an id. This id can be queried against any active + /// `Handle` structures (each one has a public `id` field). The handle with + /// the matching `id` will have some sort of event available on it. The + /// event could either be that data is available or the corresponding + /// channel has been closed. + pub fn wait(&self) -> uint { + // Note that this is currently an inefficient implementation. We in + // theory have knowledge about all ports in the set ahead of time, so + // this method shouldn't really have to iterate over all of them yet + // again. The idea with this "port set" interface is to get the + // interface right this time around, and later this implementation can + // be optimized. + // + // This implementation can be summarized by: + // + // fn select(ports) { + // if any port ready { return ready index } + // deschedule { + // block on all ports + // } + // unblock on all ports + // return ready index + // } + // + // Most notably, the iterations over all of the ports shouldn't be + // necessary. + unsafe { + let mut amt = 0; + for p in self.iter() { + assert!(!(*p).selecting.load(Relaxed)); + amt += 1; + if (*p).can_recv() { + return (*p).selection_id; + } + } + assert!(amt > 0); + + let mut ready_index = amt; + let mut ready_id = uint::max_value; + let mut iter = self.iter().enumerate(); + + // Acquire a number of blocking contexts, and block on each one + // sequentially until one fails. If one fails, then abort + // immediately so we can go unblock on all the other ports. + BlockingContext::many(amt, |ctx| { + let (i, packet) = iter.next().unwrap(); + (*packet).selecting.store(true, SeqCst); + if !ctx.block(&mut (*packet).data, + &mut (*packet).to_wake, + || (*packet).decrement()) { + (*packet).abort_selection(false); + (*packet).selecting.store(false, SeqCst); + ready_index = i; + ready_id = (*packet).selection_id; + false + } else { + true + } + }); + + // Abort the selection process on each port. If the abort process + // returns `true`, then that means that the port is ready to receive + // some data. Note that this also means that the port may have yet + // to have fully read the `to_wake` field and woken us up (although + // the wakeup is guaranteed to fail). + // + // This situation happens in the window of where a sender invokes + // increment(), sees -1, and then decides to wake up the task. After + // all this is done, the sending thread will set `selecting` to + // `false`. Until this is done, we cannot return. If we were to + // return, then a sender could wake up a port which has gone back to + // sleep after this call to `select`. + // + // Note that it is a "fairly small window" in which an increment() + // views that it should wake a thread up until the `selecting` bit + // is set to false. For now, the implementation currently just spins + // in a yield loop. This is very distasteful, but this + // implementation is already nowhere near what it should ideally be. + // A rewrite should focus on avoiding a yield loop, and for now this + // implementation is tying us over to a more efficient "don't + // iterate over everything every time" implementation. + for packet in self.iter().take(ready_index) { + if (*packet).abort_selection(true) { + ready_id = (*packet).selection_id; + while (*packet).selecting.load(Relaxed) { + imp::yield_now(); + } + } + } + + // Sanity check for now to make sure that everyone is turned off. + for packet in self.iter() { + assert!(!(*packet).selecting.load(Relaxed)); + } + + return ready_id; + } + } + + unsafe fn remove(&self, packet: *mut Packet) { + let this = cast::transmute_mut(self); + assert!(!(*packet).selecting.load(Relaxed)); + if (*packet).select_prev.is_null() { + assert_eq!(packet, this.head); + this.head = (*packet).select_next; + } else { + (*(*packet).select_prev).select_next = (*packet).select_next; + } + if (*packet).select_next.is_null() { + assert_eq!(packet, this.tail); + this.tail = (*packet).select_prev; + } else { + (*(*packet).select_next).select_prev = (*packet).select_prev; + } + (*packet).select_next = 0 as *mut Packet; + (*packet).select_prev = 0 as *mut Packet; + (*packet).selection_id = 0; + } + + fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } } +} + +impl<'self, T: Send> Handle<'self, T> { + /// Receive a value on the underlying port. Has the same semantics as + /// `Port.recv` + pub fn recv(&mut self) -> T { self.port.recv() } + /// Block to receive a value on the underlying port, returning `Some` on + /// success or `None` if the channel disconnects. This function has the same + /// semantics as `Port.recv_opt` + pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() } + /// Immediately attempt to receive a value on a port, this function will + /// never block. Has the same semantics as `Port.try_recv`. + pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() } +} + +#[unsafe_destructor] +impl Drop for Select { + fn drop(&mut self) { + assert!(self.head.is_null()); + assert!(self.tail.is_null()); + } +} + +#[unsafe_destructor] +impl<'self, T: Send> Drop for Handle<'self, T> { + fn drop(&mut self) { + unsafe { self.selector.remove(self.port.queue.packet()) } + } +} + +impl Iterator<*mut Packet> for PacketIterator { + fn next(&mut self) -> Option<*mut Packet> { + if self.cur.is_null() { + None + } else { + let ret = Some(self.cur); + unsafe { self.cur = (*self.cur).select_next; } + ret + } + } +} + +#[cfg(test)] +mod test { + use super::super::*; + use prelude::*; + + test!(fn smoke() { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + c1.send(1); + select! ( + foo = p1.recv() => { assert_eq!(foo, 1); }, + _bar = p2.recv() => { fail!() } + ) + c2.send(2); + select! ( + _foo = p1.recv() => { fail!() }, + bar = p2.recv() => { assert_eq!(bar, 2) } + ) + drop(c1); + select! ( + foo = p1.recv_opt() => { assert_eq!(foo, None); }, + _bar = p2.recv() => { fail!() } + ) + drop(c2); + select! ( + bar = p2.recv_opt() => { assert_eq!(bar, None); }, + ) + }) + + test!(fn smoke2() { + let (mut p1, _c1) = Chan::<int>::new(); + let (mut p2, _c2) = Chan::<int>::new(); + let (mut p3, _c3) = Chan::<int>::new(); + let (mut p4, _c4) = Chan::<int>::new(); + let (mut p5, c5) = Chan::<int>::new(); + c5.send(4); + select! ( + _foo = p1.recv() => { fail!("1") }, + _foo = p2.recv() => { fail!("2") }, + _foo = p3.recv() => { fail!("3") }, + _foo = p4.recv() => { fail!("4") }, + foo = p5.recv() => { assert_eq!(foo, 4); } + ) + }) + + test!(fn closed() { + let (mut p1, _c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + drop(c2); + + select! ( + _a1 = p1.recv_opt() => { fail!() }, + a2 = p2.recv_opt() => { assert_eq!(a2, None); } + ) + }) + + #[test] + fn unblocks() { + use std::io::timer; + + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, _c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<int>::new(); + + do spawn { + timer::sleep(3); + c1.send(1); + p3.recv(); + timer::sleep(3); + } + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + _b = p2.recv() => { fail!() } + ) + c3.send(1); + select! ( + a = p1.recv_opt() => { assert_eq!(a, None); }, + _b = p2.recv() => { fail!() } + ) + } + + #[test] + fn both_ready() { + use std::io::timer; + + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + do spawn { + timer::sleep(3); + c1.send(1); + c2.send(2); + p3.recv(); + } + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + a = p2.recv() => { assert_eq!(a, 2); } + ) + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + a = p2.recv() => { assert_eq!(a, 2); } + ) + c3.send(()); + } + + #[test] + fn stress() { + static AMT: int = 10000; + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + do spawn { + for i in range(0, AMT) { + if i % 2 == 0 { + c1.send(i); + } else { + c2.send(i); + } + p3.recv(); + } + } + + for i in range(0, AMT) { + select! ( + i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); } + ) + c3.send(()); + } + } + + #[test] + fn stress_native() { + use std::rt::thread::Thread; + use std::unstable::run_in_bare_thread; + static AMT: int = 10000; + + do run_in_bare_thread { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + let t = do Thread::start { + for i in range(0, AMT) { + if i % 2 == 0 { + c1.send(i); + } else { + c2.send(i); + } + p3.recv(); + } + }; + + for i in range(0, AMT) { + select! ( + i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); } + ) + c3.send(()); + } + t.join(); + } + } + + #[test] + fn native_both_ready() { + use std::rt::thread::Thread; + use std::unstable::run_in_bare_thread; + + do run_in_bare_thread { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + let t = do Thread::start { + c1.send(1); + c2.send(2); + p3.recv(); + }; + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + b = p2.recv() => { assert_eq!(b, 2); } + ) + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + b = p2.recv() => { assert_eq!(b, 2); } + ) + c3.send(()); + t.join(); + } + } +} diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index 3ac91a67d7e..6948eb60b1f 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -203,15 +203,16 @@ pub mod rt; mod std { pub use clone; pub use cmp; + pub use comm; pub use condition; pub use fmt; + pub use io; pub use kinds; pub use local_data; pub use logging; pub use logging; pub use option; pub use os; - pub use io; pub use rt; pub use str; pub use to_bytes; diff --git a/src/libstd/rt/mpmc_bounded_queue.rs b/src/libstd/rt/mpmc_bounded_queue.rs index 7f607fcf12a..1e04e5eb78d 100644 --- a/src/libstd/rt/mpmc_bounded_queue.rs +++ b/src/libstd/rt/mpmc_bounded_queue.rs @@ -1,5 +1,4 @@ -/* Multi-producer/multi-consumer bounded queue - * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -163,7 +162,6 @@ mod tests { use prelude::*; use option::*; use task; - use comm; use super::Queue; #[test] @@ -174,10 +172,9 @@ mod tests { assert_eq!(None, q.pop()); for _ in range(0, nthreads) { - let (port, chan) = comm::stream(); - chan.send(q.clone()); + let q = q.clone(); do task::spawn_sched(task::SingleThreaded) { - let mut q = port.recv(); + let mut q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } @@ -186,12 +183,11 @@ mod tests { let mut completion_ports = ~[]; for _ in range(0, nthreads) { - let (completion_port, completion_chan) = comm::stream(); + let (completion_port, completion_chan) = Chan::new(); completion_ports.push(completion_port); - let (port, chan) = comm::stream(); - chan.send(q.clone()); + let q = q.clone(); do task::spawn_sched(task::SingleThreaded) { - let mut q = port.recv(); + let mut q = q; let mut i = 0u; loop { match q.pop() { @@ -206,7 +202,7 @@ mod tests { } } - for completion_port in completion_ports.iter() { + for completion_port in completion_ports.mut_iter() { assert_eq!(nmsgs, completion_port.recv()); } } diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 4f39a1df4fa..d575028af70 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -1,5 +1,4 @@ -/* Multi-producer/single-consumer queue - * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -27,163 +26,177 @@ */ //! A mostly lock-free multi-producer, single consumer queue. -// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -use unstable::sync::UnsafeArc; -use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; -use ptr::{mut_null, to_mut_unsafe_ptr}; +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + use cast; -use option::*; use clone::Clone; use kinds::Send; +use ops::Drop; +use option::{Option, None, Some}; +use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use unstable::sync::UnsafeArc; + +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} struct Node<T> { next: AtomicPtr<Node<T>>, value: Option<T>, } -impl<T> Node<T> { - fn empty() -> Node<T> { - Node{next: AtomicPtr::new(mut_null()), value: None} - } - - fn with_value(value: T) -> Node<T> { - Node{next: AtomicPtr::new(mut_null()), value: Some(value)} - } -} - -struct State<T> { - pad0: [u8, ..64], +struct State<T, P> { head: AtomicPtr<Node<T>>, - pad1: [u8, ..64], - stub: Node<T>, - pad2: [u8, ..64], tail: *mut Node<T>, - pad3: [u8, ..64], + packet: P, } -struct Queue<T> { - priv state: UnsafeArc<State<T>>, +pub struct Consumer<T, P> { + priv state: UnsafeArc<State<T, P>>, } -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } - } +pub struct Producer<T, P> { + priv state: UnsafeArc<State<T, P>>, } -impl<T: Send> State<T> { - pub fn new() -> State<T> { - State{ - pad0: [0, ..64], - head: AtomicPtr::new(mut_null()), - pad1: [0, ..64], - stub: Node::<T>::empty(), - pad2: [0, ..64], - tail: mut_null(), - pad3: [0, ..64], - } +impl<T: Send, P: Send> Clone for Producer<T, P> { + fn clone(&self) -> Producer<T, P> { + Producer { state: self.state.clone() } } +} - fn init(&mut self) { - let stub = self.get_stub_unsafe(); - self.head.store(stub, Relaxed); - self.tail = stub; +pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { + unsafe { + let (a, b) = UnsafeArc::new2(State::new(p)); + (Consumer { state: a }, Producer { state: b }) } +} - fn get_stub_unsafe(&mut self) -> *mut Node<T> { - to_mut_unsafe_ptr(&mut self.stub) +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + cast::transmute(~Node { + next: AtomicPtr::new(0 as *mut Node<T>), + value: v, + }) } +} - fn push(&mut self, value: T) { - unsafe { - let node = cast::transmute(~Node::with_value(value)); - self.push_node(node); +impl<T: Send, P: Send> State<T, P> { + pub unsafe fn new(p: P) -> State<T, P> { + let stub = Node::new(None); + State { + head: AtomicPtr::new(stub), + tail: stub, + packet: p, } } - fn push_node(&mut self, node: *mut Node<T>) { - unsafe { - (*node).next.store(mut_null(), Release); - let prev = self.head.swap(node, Relaxed); - (*prev).next.store(node, Release); - } + unsafe fn push(&mut self, t: T) { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } - fn pop(&mut self) -> Option<T> { - unsafe { - let mut tail = self.tail; - let mut next = (*tail).next.load(Acquire); - let stub = self.get_stub_unsafe(); - if tail == stub { - if mut_null() == next { - return None - } - self.tail = next; - tail = next; - next = (*next).next.load(Acquire); - } - if next != mut_null() { - let tail: ~Node<T> = cast::transmute(tail); - self.tail = next; - return tail.value - } - let head = self.head.load(Relaxed); - if tail != head { - return None - } - self.push_node(stub); - next = (*tail).next.load(Acquire); - if next != mut_null() { - let tail: ~Node<T> = cast::transmute(tail); - self.tail = next; - return tail.value - } + unsafe fn pop(&mut self) -> PopResult<T> { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node<T> = cast::transmute(tail); + return Data(ret); } - None + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } + + unsafe fn is_empty(&mut self) -> bool { + return (*self.tail).next.load(Acquire).is_null(); } } -impl<T: Send> Queue<T> { - pub fn new() -> Queue<T> { +#[unsafe_destructor] +impl<T: Send, P: Send> Drop for State<T, P> { + fn drop(&mut self) { unsafe { - let q = Queue{state: UnsafeArc::new(State::new())}; - (*q.state.get()).init(); - q + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node<T> = cast::transmute(cur); + cur = next; + } } } +} +impl<T: Send, P: Send> Producer<T, P> { pub fn push(&mut self, value: T) { unsafe { (*self.state.get()).push(value) } } + pub fn is_empty(&self) -> bool { + unsafe{ (*self.state.get()).is_empty() } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} - pub fn pop(&mut self) -> Option<T> { - unsafe{ (*self.state.get()).pop() } +impl<T: Send, P: Send> Consumer<T, P> { + pub fn pop(&mut self) -> PopResult<T> { + unsafe { (*self.state.get()).pop() } + } + pub fn casual_pop(&mut self) -> Option<T> { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P } } #[cfg(test)] mod tests { use prelude::*; - use option::*; + use task; - use comm; - use super::Queue; + use super::{queue, Data, Empty, Inconsistent}; + + #[test] + fn test_full() { + let (_, mut p) = queue(()); + p.push(~1); + p.push(~2); + } #[test] fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); - assert_eq!(None, q.pop()); + let (mut c, p) = queue(()); + match c.pop() { + Empty => {} + Inconsistent | Data(..) => fail!() + } for _ in range(0, nthreads) { - let (port, chan) = comm::stream(); - chan.send(q.clone()); + let q = p.clone(); do task::spawn_sched(task::SingleThreaded) { - let mut q = port.recv(); + let mut q = q; for i in range(0, nmsgs) { q.push(i); } @@ -191,13 +204,10 @@ mod tests { } let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nthreads*nmsgs { break } - } + while i < nthreads * nmsgs { + match c.pop() { + Empty | Inconsistent => {}, + Data(_) => { i += 1 } } } } diff --git a/src/libstd/rt/spsc_queue.rs b/src/libstd/rt/spsc_queue.rs new file mode 100644 index 00000000000..f14533d726a --- /dev/null +++ b/src/libstd/rt/spsc_queue.rs @@ -0,0 +1,296 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue +use cast; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use unstable::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; +use unstable::sync::UnsafeArc; + +// Node within the linked list queue of messages to send +struct Node<T> { + // XXX: this could be an uninitialized T if we're careful enough, and + // that would reduce memory usage (and be a bit faster). + // is it worth it? + value: Option<T>, // nullable for re-use of nodes + next: AtomicPtr<Node<T>>, // next node in the queue +} + +// The producer/consumer halves both need access to the `tail` field, and if +// they both have access to that we may as well just give them both access +// to this whole structure. +struct State<T, P> { + // consumer fields + tail: *mut Node<T>, // where to pop from + tail_prev: AtomicPtr<Node<T>>, // where to pop from + + // producer fields + head: *mut Node<T>, // where to push to + first: *mut Node<T>, // where to get new nodes from + tail_copy: *mut Node<T>, // between first/tail + + // Cache maintenance fields. Additions and subtractions are stored + // separately in order to allow them to use nonatomic addition/subtraction. + cache_bound: uint, + cache_additions: AtomicUint, + cache_subtractions: AtomicUint, + + packet: P, +} + +pub struct Producer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +pub struct Consumer<T, P> { + priv state: UnsafeArc<State<T, P>>, +} + +pub fn queue<T: Send, P: Send>(bound: uint, + p: P) -> (Consumer<T, P>, Producer<T, P>) +{ + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + let state = State { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + packet: p, + }; + let (arc1, arc2) = UnsafeArc::new2(state); + (Consumer { state: arc1 }, Producer { state: arc2 }) +} + +impl<T: Send> Node<T> { + fn new() -> *mut Node<T> { + unsafe { + cast::transmute(~Node { + value: None, + next: AtomicPtr::new(0 as *mut Node<T>), + }) + } + } +} + +impl<T: Send, P: Send> Producer<T, P> { + pub fn push(&mut self, t: T) { + unsafe { (*self.state.get()).push(t) } + } + pub fn is_empty(&self) -> bool { + unsafe { (*self.state.get()).is_empty() } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +impl<T: Send, P: Send> Consumer<T, P> { + pub fn pop(&mut self) -> Option<T> { + unsafe { (*self.state.get()).pop() } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} + +impl<T: Send, P: Send> State<T, P> { + // remember that there is only one thread executing `push` (and only one + // thread executing `pop`) + unsafe fn push(&mut self, t: T) { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node<T>, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } + + unsafe fn alloc(&mut self) -> *mut Node<T> { + // First try to see if we can consume the 'first' node for our uses. + // We try to avoid as many atomic instructions as possible here, so + // the addition to cache_subtractions is not atomic (plus we're the + // only one subtracting from the cache). + if self.first != self.tail_copy { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = self.first; + self.first = (*ret).next.load(Relaxed); + return ret; + } + // If the above fails, then update our copy of the tail and try + // again. + self.tail_copy = self.tail_prev.load(Acquire); + if self.first != self.tail_copy { + if self.cache_bound > 0 { + let b = self.cache_subtractions.load(Relaxed); + self.cache_subtractions.store(b + 1, Relaxed); + } + let ret = self.first; + self.first = (*ret).next.load(Relaxed); + return ret; + } + // If all of that fails, then we have to allocate a new node + // (there's nothing in the node cache). + Node::new() + } + + // remember that there is only one thread executing `pop` (and only one + // thread executing `push`) + unsafe fn pop(&mut self) -> Option<T> { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); + + self.tail = next; + if self.cache_bound == 0 { + self.tail_prev.store(tail, Release); + } else { + // XXX: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node<T> = cast::transmute(tail); + } + } + return ret; + } + + unsafe fn is_empty(&self) -> bool { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } +} + +#[unsafe_destructor] +impl<T: Send, P: Send> Drop for State<T, P> { + fn drop(&mut self) { + unsafe { + let mut cur = self.first; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _n: ~Node<T> = cast::transmute(cur); + cur = next; + } + } + } +} + +#[cfg(test)] +mod test { + use prelude::*; + use super::queue; + use task; + + #[test] + fn smoke() { + let (mut c, mut p) = queue(0, ()); + p.push(1); + p.push(2); + assert_eq!(c.pop(), Some(1)); + assert_eq!(c.pop(), Some(2)); + assert_eq!(c.pop(), None); + p.push(3); + p.push(4); + assert_eq!(c.pop(), Some(3)); + assert_eq!(c.pop(), Some(4)); + assert_eq!(c.pop(), None); + } + + #[test] + fn drop_full() { + let (_, mut p) = queue(0, ()); + p.push(~1); + p.push(~2); + } + + #[test] + fn smoke_bound() { + let (mut c, mut p) = queue(1, ()); + p.push(1); + p.push(2); + assert_eq!(c.pop(), Some(1)); + assert_eq!(c.pop(), Some(2)); + assert_eq!(c.pop(), None); + p.push(3); + p.push(4); + assert_eq!(c.pop(), Some(3)); + assert_eq!(c.pop(), Some(4)); + assert_eq!(c.pop(), None); + } + + #[test] + fn stress() { + stress_bound(0); + stress_bound(1); + + fn stress_bound(bound: uint) { + let (c, mut p) = queue(bound, ()); + do task::spawn_sched(task::SingleThreaded) { + let mut c = c; + for _ in range(0, 100000) { + loop { + match c.pop() { + Some(1) => break, + Some(_) => fail!(), + None => {} + } + } + } + } + for _ in range(0, 100000) { + p.push(1); + } + } + } +} diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 86cc895eb27..2adc32f33fb 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -26,7 +26,6 @@ use option::{Option, Some, None}; use rt::borrowck::BorrowRecord; use rt::borrowck; use rt::context::Context; -use rt::context; use rt::env; use io::Writer; use rt::kill::Death; |
