about summary refs log tree commit diff
path: root/src/libstd/comm/shared.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-12-23 11:53:35 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-12-29 12:16:49 -0800
commitbc83a009f655dd3896be4a7cd33cac8032a605f2 (patch)
tree3acc8533031219690fe14fa56f4427cfa9297296 /src/libstd/comm/shared.rs
parentbb8f4fc3b73918abd19d67be702f78e8f73d1874 (diff)
downloadrust-bc83a009f655dd3896be4a7cd33cac8032a605f2.tar.gz
rust-bc83a009f655dd3896be4a7cd33cac8032a605f2.zip
std: Second pass stabilization for `comm`
This commit is a second pass stabilization for the `std::comm` module,
performing the following actions:

* The entire `std::comm` module was moved under `std::sync::mpsc`. This movement
  reflects that channels are just yet another synchronization primitive, and
  they don't necessarily deserve a special place outside of the other
  concurrency primitives that the standard library offers.
* The `send` and `recv` methods have all been removed.
* The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`.
  This means that all send/receive operations return a `Result` now indicating
  whether the operation was successful or not.
* The error type of `send` is now a `SendError` to implement a custom error
  message and allow for `unwrap()`. The error type contains an `into_inner`
  method to extract the value.
* The error type of `recv` is now `RecvError` for the same reasons as `send`.
* The `TryRecvError` and `TrySendError` types have had public reexports removed
  of their variants and the variant names have been tweaked with enum
  namespacing rules.
* The `Messages` iterator is renamed to `Iter`

This functionality is now all `#[stable]`:

* `Sender`
* `SyncSender`
* `Receiver`
* `std::sync::mpsc`
* `channel`
* `sync_channel`
* `Iter`
* `Sender::send`
* `Sender::clone`
* `SyncSender::send`
* `SyncSender::try_send`
* `SyncSender::clone`
* `Receiver::recv`
* `Receiver::try_recv`
* `Receiver::iter`
* `SendError`
* `RecvError`
* `TrySendError::{mod, Full, Disconnected}`
* `TryRecvError::{mod, Empty, Disconnected}`
* `SendError::into_inner`
* `TrySendError::into_inner`

This is a breaking change due to the modification of where this module is
located, as well as the changing of the semantics of `send` and `recv`. Most
programs just need to rename imports of `std::comm` to `std::sync::mpsc` and
add calls to `unwrap` after a send or a receive operation.

[breaking-change]
Diffstat (limited to 'src/libstd/comm/shared.rs')
-rw-r--r--src/libstd/comm/shared.rs486
1 files changed, 0 insertions, 486 deletions
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
deleted file mode 100644
index 1022694e634..00000000000
--- a/src/libstd/comm/shared.rs
+++ /dev/null
@@ -1,486 +0,0 @@
-// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-/// Shared channels
-///
-/// This is the flavor of channels which are not necessarily optimized for any
-/// particular use case, but are the most general in how they are used. Shared
-/// channels are cloneable allowing for multiple senders.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module. You'll also note that the implementation of the shared and stream
-/// channels are quite similar, and this is no coincidence!
-
-pub use self::Failure::*;
-
-use core::prelude::*;
-
-use core::cmp;
-use core::int;
-
-use sync::{atomic, Mutex, MutexGuard};
-use comm::mpsc_queue as mpsc;
-use comm::blocking::{mod, SignalToken};
-use comm::select::StartResult;
-use comm::select::StartResult::*;
-use thread::Thread;
-
-const DISCONNECTED: int = int::MIN;
-const FUDGE: int = 1024;
-#[cfg(test)]
-const MAX_STEALS: int = 5;
-#[cfg(not(test))]
-const MAX_STEALS: int = 1 << 20;
-
-pub struct Packet<T> {
-    queue: mpsc::Queue<T>,
-    cnt: atomic::AtomicInt, // How many items are on this channel
-    steals: int, // How many times has a port received without blocking?
-    to_wake: atomic::AtomicUint, // SignalToken for wake up
-
-    // The number of channels which are currently using this packet.
-    channels: atomic::AtomicInt,
-
-    // See the discussion in Port::drop and the channel send methods for what
-    // these are used for
-    port_dropped: atomic::AtomicBool,
-    sender_drain: atomic::AtomicInt,
-
-    // this lock protects various portions of this implementation during
-    // select()
-    select_lock: Mutex<()>,
-}
-
-pub enum Failure {
-    Empty,
-    Disconnected,
-}
-
-impl<T: Send> Packet<T> {
-    // Creation of a packet *must* be followed by a call to postinit_lock
-    // and later by inherit_blocker
-    pub fn new() -> Packet<T> {
-        let p = Packet {
-            queue: mpsc::Queue::new(),
-            cnt: atomic::AtomicInt::new(0),
-            steals: 0,
-            to_wake: atomic::AtomicUint::new(0),
-            channels: atomic::AtomicInt::new(2),
-            port_dropped: atomic::AtomicBool::new(false),
-            sender_drain: atomic::AtomicInt::new(0),
-            select_lock: Mutex::new(()),
-        };
-        return p;
-    }
-
-    // This function should be used after newly created Packet
-    // was wrapped with an Arc
-    // In other case mutex data will be duplicated while cloning
-    // and that could cause problems on platforms where it is
-    // represented by opaque data structure
-    pub fn postinit_lock(&self) -> MutexGuard<()> {
-        self.select_lock.lock()
-    }
-
-    // This function is used at the creation of a shared packet to inherit a
-    // previously blocked task. This is done to prevent spurious wakeups of
-    // tasks in select().
-    //
-    // This can only be called at channel-creation time
-    pub fn inherit_blocker(&mut self,
-                           token: Option<SignalToken>,
-                           guard: MutexGuard<()>) {
-        token.map(|token| {
-            assert_eq!(self.cnt.load(atomic::SeqCst), 0);
-            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-            self.to_wake.store(unsafe { token.cast_to_uint() }, atomic::SeqCst);
-            self.cnt.store(-1, atomic::SeqCst);
-
-            // This store is a little sketchy. What's happening here is that
-            // we're transferring a blocker from a oneshot or stream channel to
-            // this shared channel. In doing so, we never spuriously wake them
-            // up and rather only wake them up at the appropriate time. This
-            // implementation of shared channels assumes that any blocking
-            // recv() will undo the increment of steals performed in try_recv()
-            // once the recv is complete.  This thread that we're inheriting,
-            // however, is not in the middle of recv. Hence, the first time we
-            // wake them up, they're going to wake up from their old port, move
-            // on to the upgraded port, and then call the block recv() function.
-            //
-            // When calling this function, they'll find there's data immediately
-            // available, counting it as a steal. This in fact wasn't a steal
-            // because we appropriately blocked them waiting for data.
-            //
-            // To offset this bad increment, we initially set the steal count to
-            // -1. You'll find some special code in abort_selection() as well to
-            // ensure that this -1 steal count doesn't escape too far.
-            self.steals = -1;
-        });
-
-        // When the shared packet is constructed, we grabbed this lock. The
-        // purpose of this lock is to ensure that abort_selection() doesn't
-        // interfere with this method. After we unlock this lock, we're
-        // signifying that we're done modifying self.cnt and self.to_wake and
-        // the port is ready for the world to continue using it.
-        drop(guard);
-    }
-
-    pub fn send(&mut self, t: T) -> Result<(), T> {
-        // See Port::drop for what's going on
-        if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
-
-        // Note that the multiple sender case is a little trickier
-        // semantically than the single sender case. The logic for
-        // incrementing is "add and if disconnected store disconnected".
-        // This could end up leading some senders to believe that there
-        // wasn't a disconnect if in fact there was a disconnect. This means
-        // that while one thread is attempting to re-store the disconnected
-        // states, other threads could walk through merrily incrementing
-        // this very-negative disconnected count. To prevent senders from
-        // spuriously attempting to send when the channels is actually
-        // disconnected, the count has a ranged check here.
-        //
-        // This is also done for another reason. Remember that the return
-        // value of this function is:
-        //
-        //  `true` == the data *may* be received, this essentially has no
-        //            meaning
-        //  `false` == the data will *never* be received, this has a lot of
-        //             meaning
-        //
-        // In the SPSC case, we have a check of 'queue.is_empty()' to see
-        // whether the data was actually received, but this same condition
-        // means nothing in a multi-producer context. As a result, this
-        // preflight check serves as the definitive "this will never be
-        // received". Once we get beyond this check, we have permanently
-        // entered the realm of "this may be received"
-        if self.cnt.load(atomic::SeqCst) < DISCONNECTED + FUDGE {
-            return Err(t)
-        }
-
-        self.queue.push(t);
-        match self.cnt.fetch_add(1, atomic::SeqCst) {
-            -1 => {
-                self.take_to_wake().signal();
-            }
-
-            // In this case, we have possibly failed to send our data, and
-            // we need to consider re-popping the data in order to fully
-            // destroy it. We must arbitrate among the multiple senders,
-            // however, because the queues that we're using are
-            // single-consumer queues. In order to do this, all exiting
-            // pushers will use an atomic count in order to count those
-            // flowing through. Pushers who see 0 are required to drain as
-            // much as possible, and then can only exit when they are the
-            // only pusher (otherwise they must try again).
-            n if n < DISCONNECTED + FUDGE => {
-                // see the comment in 'try' for a shared channel for why this
-                // window of "not disconnected" is ok.
-                self.cnt.store(DISCONNECTED, atomic::SeqCst);
-
-                if self.sender_drain.fetch_add(1, atomic::SeqCst) == 0 {
-                    loop {
-                        // drain the queue, for info on the thread yield see the
-                        // discussion in try_recv
-                        loop {
-                            match self.queue.pop() {
-                                mpsc::Data(..) => {}
-                                mpsc::Empty => break,
-                                mpsc::Inconsistent => Thread::yield_now(),
-                            }
-                        }
-                        // maybe we're done, if we're not the last ones
-                        // here, then we need to go try again.
-                        if self.sender_drain.fetch_sub(1, atomic::SeqCst) == 1 {
-                            break
-                        }
-                    }
-
-                    // At this point, there may still be data on the queue,
-                    // but only if the count hasn't been incremented and
-                    // some other sender hasn't finished pushing data just
-                    // yet. That sender in question will drain its own data.
-                }
-            }
-
-            // Can't make any assumptions about this case like in the SPSC case.
-            _ => {}
-        }
-
-        Ok(())
-    }
-
-    pub fn recv(&mut self) -> Result<T, Failure> {
-        // This code is essentially the exact same as that found in the stream
-        // case (see stream.rs)
-        match self.try_recv() {
-            Err(Empty) => {}
-            data => return data,
-        }
-
-        let (wait_token, signal_token) = blocking::tokens();
-        if self.decrement(signal_token) == Installed {
-            wait_token.wait()
-        }
-
-        match self.try_recv() {
-            data @ Ok(..) => { self.steals -= 1; data }
-            data => data,
-        }
-    }
-
-    // Essentially the exact same thing as the stream decrement function.
-    // Returns true if blocking should proceed.
-    fn decrement(&mut self, token: SignalToken) -> StartResult {
-        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-        let ptr = unsafe { token.cast_to_uint() };
-        self.to_wake.store(ptr, atomic::SeqCst);
-
-        let steals = self.steals;
-        self.steals = 0;
-
-        match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
-            // If we factor in our steals and notice that the channel has no
-            // data, we successfully sleep
-            n => {
-                assert!(n >= 0);
-                if n - steals <= 0 { return Installed }
-            }
-        }
-
-        self.to_wake.store(0, atomic::SeqCst);
-        drop(unsafe { SignalToken::cast_from_uint(ptr) });
-        Abort
-    }
-
-    pub fn try_recv(&mut self) -> Result<T, Failure> {
-        let ret = match self.queue.pop() {
-            mpsc::Data(t) => Some(t),
-            mpsc::Empty => None,
-
-            // This is a bit of an interesting case. The channel is reported as
-            // having data available, but our pop() has failed due to the queue
-            // being in an inconsistent state.  This means that there is some
-            // pusher somewhere which has yet to complete, but we are guaranteed
-            // that a pop will eventually succeed. In this case, we spin in a
-            // yield loop because the remote sender should finish their enqueue
-            // operation "very quickly".
-            //
-            // Avoiding this yield loop would require a different queue
-            // abstraction which provides the guarantee that after M pushes have
-            // succeeded, at least M pops will succeed. The current queues
-            // guarantee that if there are N active pushes, you can pop N times
-            // once all N have finished.
-            mpsc::Inconsistent => {
-                let data;
-                loop {
-                    Thread::yield_now();
-                    match self.queue.pop() {
-                        mpsc::Data(t) => { data = t; break }
-                        mpsc::Empty => panic!("inconsistent => empty"),
-                        mpsc::Inconsistent => {}
-                    }
-                }
-                Some(data)
-            }
-        };
-        match ret {
-            // See the discussion in the stream implementation for why we
-            // might decrement steals.
-            Some(data) => {
-                if self.steals > MAX_STEALS {
-                    match self.cnt.swap(0, atomic::SeqCst) {
-                        DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, atomic::SeqCst);
-                        }
-                        n => {
-                            let m = cmp::min(n, self.steals);
-                            self.steals -= m;
-                            self.bump(n - m);
-                        }
-                    }
-                    assert!(self.steals >= 0);
-                }
-                self.steals += 1;
-                Ok(data)
-            }
-
-            // See the discussion in the stream implementation for why we try
-            // again.
-            None => {
-                match self.cnt.load(atomic::SeqCst) {
-                    n if n != DISCONNECTED => Err(Empty),
-                    _ => {
-                        match self.queue.pop() {
-                            mpsc::Data(t) => Ok(t),
-                            mpsc::Empty => Err(Disconnected),
-                            // with no senders, an inconsistency is impossible.
-                            mpsc::Inconsistent => unreachable!(),
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    // Prepares this shared packet for a channel clone, essentially just bumping
-    // a refcount.
-    pub fn clone_chan(&mut self) {
-        self.channels.fetch_add(1, atomic::SeqCst);
-    }
-
-    // Decrement the reference count on a channel. This is called whenever a
-    // Chan is dropped and may end up waking up a receiver. It's the receiver's
-    // responsibility on the other end to figure out that we've disconnected.
-    pub fn drop_chan(&mut self) {
-        match self.channels.fetch_sub(1, atomic::SeqCst) {
-            1 => {}
-            n if n > 1 => return,
-            n => panic!("bad number of channels left {}", n),
-        }
-
-        match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-            -1 => { self.take_to_wake().signal(); }
-            DISCONNECTED => {}
-            n => { assert!(n >= 0); }
-        }
-    }
-
-    // See the long discussion inside of stream.rs for why the queue is drained,
-    // and why it is done in this fashion.
-    pub fn drop_port(&mut self) {
-        self.port_dropped.store(true, atomic::SeqCst);
-        let mut steals = self.steals;
-        while {
-            let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, atomic::SeqCst);
-            cnt != DISCONNECTED && cnt != steals
-        } {
-            // See the discussion in 'try_recv' for why we yield
-            // control of this thread.
-            loop {
-                match self.queue.pop() {
-                    mpsc::Data(..) => { steals += 1; }
-                    mpsc::Empty | mpsc::Inconsistent => break,
-                }
-            }
-        }
-    }
-
-    // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> SignalToken {
-        let ptr = self.to_wake.load(atomic::SeqCst);
-        self.to_wake.store(0, atomic::SeqCst);
-        assert!(ptr != 0);
-        unsafe { SignalToken::cast_from_uint(ptr) }
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // Helper function for select, tests whether this port can receive without
-    // blocking (obviously not an atomic decision).
-    //
-    // This is different than the stream version because there's no need to peek
-    // at the queue, we can just look at the local count.
-    pub fn can_recv(&mut self) -> bool {
-        let cnt = self.cnt.load(atomic::SeqCst);
-        cnt == DISCONNECTED || cnt - self.steals > 0
-    }
-
-    // increment the count on the channel (used for selection)
-    fn bump(&mut self, amt: int) -> int {
-        match self.cnt.fetch_add(amt, atomic::SeqCst) {
-            DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, atomic::SeqCst);
-                DISCONNECTED
-            }
-            n => n
-        }
-    }
-
-    // Inserts the signal token for selection on this port, returning true if
-    // blocking should proceed.
-    //
-    // The code here is the same as in stream.rs, except that it doesn't need to
-    // peek at the channel to see if an upgrade is pending.
-    pub fn start_selection(&mut self, token: SignalToken) -> StartResult {
-        match self.decrement(token) {
-            Installed => Installed,
-            Abort => {
-                let prev = self.bump(1);
-                assert!(prev == DISCONNECTED || prev >= 0);
-                Abort
-            }
-        }
-    }
-
-    // Cancels a previous task waiting on this port, returning whether there's
-    // data on the port.
-    //
-    // This is similar to the stream implementation (hence fewer comments), but
-    // uses a different value for the "steals" variable.
-    pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
-        // Before we do anything else, we bounce on this lock. The reason for
-        // doing this is to ensure that any upgrade-in-progress is gone and
-        // done with. Without this bounce, we can race with inherit_blocker
-        // about looking at and dealing with to_wake. Once we have acquired the
-        // lock, we are guaranteed that inherit_blocker is done.
-        {
-            let _guard = self.select_lock.lock();
-        }
-
-        // Like the stream implementation, we want to make sure that the count
-        // on the channel goes non-negative. We don't know how negative the
-        // stream currently is, so instead of using a steal value of 1, we load
-        // the channel count and figure out what we should do to make it
-        // positive.
-        let steals = {
-            let cnt = self.cnt.load(atomic::SeqCst);
-            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
-        };
-        let prev = self.bump(steals + 1);
-
-        if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-            true
-        } else {
-            let cur = prev + steals + 1;
-            assert!(cur >= 0);
-            if prev < 0 {
-                drop(self.take_to_wake());
-            } else {
-                while self.to_wake.load(atomic::SeqCst) != 0 {
-                    Thread::yield_now();
-                }
-            }
-            // if the number of steals is -1, it was the pre-emptive -1 steal
-            // count from when we inherited a blocker. This is fine because
-            // we're just going to overwrite it with a real value.
-            assert!(self.steals == 0 || self.steals == -1);
-            self.steals = steals;
-            prev >= 0
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Packet<T> {
-    fn drop(&mut self) {
-        // Note that this load is not only an assert for correctness about
-        // disconnection, but also a proper fence before the read of
-        // `to_wake`, so this assert cannot be removed with also removing
-        // the `to_wake` assert.
-        assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-        assert_eq!(self.channels.load(atomic::SeqCst), 0);
-    }
-}