about summary refs log tree commit diff
path: root/src/libstd/comm/shared.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/comm/shared.rs')
-rw-r--r--src/libstd/comm/shared.rs483
1 files changed, 483 insertions, 0 deletions
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
new file mode 100644
index 00000000000..30e061bb7b9
--- /dev/null
+++ b/src/libstd/comm/shared.rs
@@ -0,0 +1,483 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Shared channels
+///
+/// This is the flavor of channels which are not necessarily optimized for any
+/// particular use case, but are the most general in how they are used. Shared
+/// channels are cloneable allowing for multiple senders.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module. You'll also note that the implementation of the shared and stream
+/// channels are quite similar, and this is no coincidence!
+
+use int;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use result::{Ok, Err, Result};
+use rt::local::Local;
+use rt::task::{Task, BlockedTask};
+use rt::thread::Thread;
+use sync::atomics;
+use unstable::mutex::Mutex;
+use vec::OwnedVector;
+
+use mpsc = sync::mpsc_queue;
+
+static DISCONNECTED: int = int::MIN;
+static FUDGE: int = 1024;
+static MAX_STEALS: int = 1 << 20;
+
+pub struct Packet<T> {
+    queue: mpsc::Queue<T>,
+    cnt: atomics::AtomicInt, // How many items are on this channel
+    steals: int, // How many times has a port received without blocking?
+    to_wake: atomics::AtomicUint, // Task to wake up
+
+    // The number of channels which are currently using this packet.
+    channels: atomics::AtomicInt,
+
+    // See the discussion in Port::drop and the channel send methods for what
+    // these are used for
+    port_dropped: atomics::AtomicBool,
+    sender_drain: atomics::AtomicInt,
+
+    // this lock protects various portions of this implementation during
+    // select()
+    select_lock: Mutex,
+}
+
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+impl<T: Send> Packet<T> {
+    // Creation of a packet *must* be followed by a call to inherit_blocker
+    pub fn new() -> Packet<T> {
+        let mut p = Packet {
+            queue: mpsc::Queue::new(),
+            cnt: atomics::AtomicInt::new(0),
+            steals: 0,
+            to_wake: atomics::AtomicUint::new(0),
+            channels: atomics::AtomicInt::new(2),
+            port_dropped: atomics::AtomicBool::new(false),
+            sender_drain: atomics::AtomicInt::new(0),
+            select_lock: unsafe { Mutex::new() },
+        };
+        // see comments in inherit_blocker about why we grab this lock
+        unsafe { p.select_lock.lock() }
+        return p;
+    }
+
+    // This function is used at the creation of a shared packet to inherit a
+    // previously blocked task. This is done to prevent spurious wakeups of
+    // tasks in select().
+    //
+    // This can only be called at channel-creation time
+    pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+        match task {
+            Some(task) => {
+                assert_eq!(self.cnt.load(atomics::SeqCst), 0);
+                assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+                self.to_wake.store(unsafe { task.cast_to_uint() },
+                                   atomics::SeqCst);
+                self.cnt.store(-1, atomics::SeqCst);
+
+                // This store is a little sketchy. What's happening here is
+                // that we're transferring a blocker from a oneshot or stream
+                // channel to this shared channel. In doing so, we never
+                // spuriously wake them up and rather only wake them up at the
+                // appropriate time. This implementation of shared channels
+                // assumes that any blocking recv() will undo the increment of
+                // steals performed in try_recv() once the recv is complete.
+                // This thread that we're inheriting, however, is not in the
+                // middle of recv. Hence, the first time we wake them up,
+                // they're going to wake up from their old port, move on to the
+                // upgraded port, and then call the block recv() function.
+                //
+                // When calling this function, they'll find there's data
+                // immediately available, counting it as a steal. This in fact
+                // wasn't a steal because we appropriately blocked them waiting
+                // for data.
+                //
+                // To offset this bad increment, we initially set the steal
+                // count to -1. You'll find some special code in
+                // abort_selection() as well to ensure that this -1 steal count
+                // doesn't escape too far.
+                self.steals = -1;
+            }
+            None => {}
+        }
+
+        // When the shared packet is constructed, we grabbed this lock. The
+        // purpose of this lock is to ensure that abort_selection() doesn't
+        // interfere with this method. After we unlock this lock, we're
+        // signifying that we're done modifying self.cnt and self.to_wake and
+        // the port is ready for the world to continue using it.
+        unsafe { self.select_lock.unlock() }
+    }
+
+    pub fn send(&mut self, t: T) -> bool {
+        // See Port::drop for what's going on
+        if self.port_dropped.load(atomics::SeqCst) { return false }
+
+        // Note that the multiple sender case is a little tricker
+        // semantically than the single sender case. The logic for
+        // incrementing is "add and if disconnected store disconnected".
+        // This could end up leading some senders to believe that there
+        // wasn't a disconnect if in fact there was a disconnect. This means
+        // that while one thread is attempting to re-store the disconnected
+        // states, other threads could walk through merrily incrementing
+        // this very-negative disconnected count. To prevent senders from
+        // spuriously attempting to send when the channels is actually
+        // disconnected, the count has a ranged check here.
+        //
+        // This is also done for another reason. Remember that the return
+        // value of this function is:
+        //
+        //  `true` == the data *may* be received, this essentially has no
+        //            meaning
+        //  `false` == the data will *never* be received, this has a lot of
+        //             meaning
+        //
+        // In the SPSC case, we have a check of 'queue.is_empty()' to see
+        // whether the data was actually received, but this same condition
+        // means nothing in a multi-producer context. As a result, this
+        // preflight check serves as the definitive "this will never be
+        // received". Once we get beyond this check, we have permanently
+        // entered the realm of "this may be received"
+        if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
+            return false
+        }
+
+        self.queue.push(t);
+        match self.cnt.fetch_add(1, atomics::SeqCst) {
+            -1 => {
+                self.take_to_wake().wake().map(|t| t.reawaken());
+            }
+
+            // In this case, we have possibly failed to send our data, and
+            // we need to consider re-popping the data in order to fully
+            // destroy it. We must arbitrate among the multiple senders,
+            // however, because the queues that we're using are
+            // single-consumer queues. In order to do this, all exiting
+            // pushers will use an atomic count in order to count those
+            // flowing through. Pushers who see 0 are required to drain as
+            // much as possible, and then can only exit when they are the
+            // only pusher (otherwise they must try again).
+            n if n < DISCONNECTED + FUDGE => {
+                // see the comment in 'try' for a shared channel for why this
+                // window of "not disconnected" is ok.
+                self.cnt.store(DISCONNECTED, atomics::SeqCst);
+
+                if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 {
+                    loop {
+                        // drain the queue, for info on the thread yield see the
+                        // discussion in try_recv
+                        loop {
+                            match self.queue.pop() {
+                                mpsc::Data(..) => {}
+                                mpsc::Empty => break,
+                                mpsc::Inconsistent => Thread::yield_now(),
+                            }
+                        }
+                        // maybe we're done, if we're not the last ones
+                        // here, then we need to go try again.
+                        if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 {
+                            break
+                        }
+                    }
+
+                    // At this point, there may still be data on the queue,
+                    // but only if the count hasn't been incremented and
+                    // some other sender hasn't finished pushing data just
+                    // yet. That sender in question will drain its own data.
+                }
+            }
+
+            // Can't make any assumptions about this case like in the SPSC case.
+            _ => {}
+        }
+
+        true
+    }
+
+    pub fn recv(&mut self) -> Result<T, Failure> {
+        // This code is essentially the exact same as that found in the stream
+        // case (see stream.rs)
+        match self.try_recv() {
+            Err(Empty) => {}
+            data => return data,
+        }
+
+        let task: ~Task = Local::take();
+        task.deschedule(1, |task| {
+            self.decrement(task)
+        });
+
+        match self.try_recv() {
+            data @ Ok(..) => { self.steals -= 1; data }
+            data => data,
+        }
+    }
+
+    // Essentially the exact same thing as the stream decrement function.
+    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
+        assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+        let n = unsafe { task.cast_to_uint() };
+        self.to_wake.store(n, atomics::SeqCst);
+
+        let steals = self.steals;
+        self.steals = 0;
+
+        match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) {
+            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); }
+            // If we factor in our steals and notice that the channel has no
+            // data, we successfully sleep
+            n => {
+                assert!(n >= 0);
+                if n - steals <= 0 { return Ok(()) }
+            }
+        }
+
+        self.to_wake.store(0, atomics::SeqCst);
+        Err(unsafe { BlockedTask::cast_from_uint(n) })
+    }
+
+    pub fn try_recv(&mut self) -> Result<T, Failure> {
+        let ret = match self.queue.pop() {
+            mpsc::Data(t) => Some(t),
+            mpsc::Empty => None,
+
+            // This is a bit of an interesting case. The channel is
+            // reported as having data available, but our pop() has
+            // failed due to the queue being in an inconsistent state.
+            // This means that there is some pusher somewhere which has
+            // yet to complete, but we are guaranteed that a pop will
+            // eventually succeed. In this case, we spin in a yield loop
+            // because the remote sender should finish their enqueue
+            // operation "very quickly".
+            //
+            // Note that this yield loop does *not* attempt to do a green
+            // yield (regardless of the context), but *always* performs an
+            // OS-thread yield. The reasoning for this is that the pusher in
+            // question which is causing the inconsistent state is
+            // guaranteed to *not* be a blocked task (green tasks can't get
+            // pre-empted), so it must be on a different OS thread. Also,
+            // `try_recv` is normally a "guaranteed no rescheduling" context
+            // in a green-thread situation. By yielding control of the
+            // thread, we will hopefully allow time for the remote task on
+            // the other OS thread to make progress.
+            //
+            // Avoiding this yield loop would require a different queue
+            // abstraction which provides the guarantee that after M
+            // pushes have succeeded, at least M pops will succeed. The
+            // current queues guarantee that if there are N active
+            // pushes, you can pop N times once all N have finished.
+            mpsc::Inconsistent => {
+                let data;
+                loop {
+                    Thread::yield_now();
+                    match self.queue.pop() {
+                        mpsc::Data(t) => { data = t; break }
+                        mpsc::Empty => fail!("inconsistent => empty"),
+                        mpsc::Inconsistent => {}
+                    }
+                }
+                Some(data)
+            }
+        };
+        match ret {
+            // See the discussion in the stream implementation for why we we
+            // might decrement steals.
+            Some(data) => {
+                self.steals += 1;
+                if self.steals > MAX_STEALS {
+                    match self.cnt.swap(0, atomics::SeqCst) {
+                        DISCONNECTED => {
+                            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+                        }
+                        n => { self.steals -= n; }
+                    }
+                    assert!(self.steals >= 0);
+                }
+                Ok(data)
+            }
+
+            // See the discussion in the stream implementation for why we try
+            // again.
+            None => {
+                match self.cnt.load(atomics::SeqCst) {
+                    n if n != DISCONNECTED => Err(Empty),
+                    _ => {
+                        match self.queue.pop() {
+                            mpsc::Data(t) => Ok(t),
+                            mpsc::Empty => Err(Disconnected),
+                            // with no senders, an inconsistency is impossible.
+                            mpsc::Inconsistent => unreachable!(),
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&mut self) {
+        self.channels.fetch_add(1, atomics::SeqCst);
+    }
+
+    // Decrement the reference count on a channel. This is called whenever a
+    // Chan is dropped and may end up waking up a receiver. It's the receiver's
+    // responsibility on the other end to figure out that we've disconnected.
+    pub fn drop_chan(&mut self) {
+        match self.channels.fetch_sub(1, atomics::SeqCst) {
+            1 => {}
+            n if n > 1 => return,
+            n => fail!("bad number of channels left {}", n),
+        }
+
+        match self.cnt.swap(DISCONNECTED, atomics::SeqCst) {
+            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
+            DISCONNECTED => {}
+            n => { assert!(n >= 0); }
+        }
+    }
+
+    // See the long discussion inside of stream.rs for why the queue is drained,
+    // and why it is done in this fashion.
+    pub fn drop_port(&mut self) {
+        self.port_dropped.store(true, atomics::SeqCst);
+        let mut steals = self.steals;
+        while {
+            let cnt = self.cnt.compare_and_swap(
+                            steals, DISCONNECTED, atomics::SeqCst);
+            cnt != DISCONNECTED && cnt != steals
+        } {
+            // See the discussion in 'try_recv' for why we yield
+            // control of this thread.
+            loop {
+                match self.queue.pop() {
+                    mpsc::Data(..) => { steals += 1; }
+                    mpsc::Empty | mpsc::Inconsistent => break,
+                }
+            }
+        }
+    }
+
+    // Consumes ownership of the 'to_wake' field.
+    fn take_to_wake(&mut self) -> BlockedTask {
+        let task = self.to_wake.load(atomics::SeqCst);
+        self.to_wake.store(0, atomics::SeqCst);
+        assert!(task != 0);
+        unsafe { BlockedTask::cast_from_uint(task) }
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // Helper function for select, tests whether this port can receive without
+    // blocking (obviously not an atomic decision).
+    //
+    // This is different than the stream version because there's no need to peek
+    // at the queue, we can just look at the local count.
+    pub fn can_recv(&mut self) -> bool {
+        let cnt = self.cnt.load(atomics::SeqCst);
+        cnt == DISCONNECTED || cnt - self.steals > 0
+    }
+
+    // Inserts the blocked task for selection on this port, returning it back if
+    // the port already has data on it.
+    //
+    // The code here is the same as in stream.rs, except that it doesn't need to
+    // peek at the channel to see if an upgrade is pending.
+    pub fn start_selection(&mut self,
+                           task: BlockedTask) -> Result<(), BlockedTask> {
+        match self.decrement(task) {
+            Ok(()) => Ok(()),
+            Err(task) => {
+                let prev = self.cnt.fetch_add(1, atomics::SeqCst);
+                assert!(prev >= 0);
+                return Err(task);
+            }
+        }
+    }
+
+    // Cancels a previous task waiting on this port, returning whether there's
+    // data on the port.
+    //
+    // This is similar to the stream implementation (hence fewer comments), but
+    // uses a different value for the "steals" variable.
+    pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool {
+        // Before we do anything else, we bounce on this lock. The reason for
+        // doing this is to ensure that any upgrade-in-progress is gone and
+        // done with. Without this bounce, we can race with inherit_blocker
+        // about looking at and dealing with to_wake. Once we have acquired the
+        // lock, we are guaranteed that inherit_blocker is done.
+        unsafe {
+            self.select_lock.lock();
+            self.select_lock.unlock();
+        }
+
+        // Like the stream implementation, we want to make sure that the count
+        // on the channel goes non-negative. We don't know how negative the
+        // stream currently is, so instead of using a steal value of 1, we load
+        // the channel count and figure out what we should do to make it
+        // positive.
+        let steals = {
+            let cnt = self.cnt.load(atomics::SeqCst);
+            if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+        };
+        let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst);
+
+        if prev == DISCONNECTED {
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            self.cnt.store(DISCONNECTED, atomics::SeqCst);
+            true
+        } else {
+            let cur = prev + steals + 1;
+            assert!(cur >= 0);
+            if prev < 0 {
+                self.take_to_wake().trash();
+            } else {
+                while self.to_wake.load(atomics::SeqCst) != 0 {
+                    Thread::yield_now();
+                }
+            }
+            // if the number of steals is -1, it was the pre-emptive -1 steal
+            // count from when we inherited a blocker. This is fine because
+            // we're just going to overwrite it with a real value.
+            assert!(self.steals == 0 || self.steals == -1);
+            self.steals = steals;
+            prev >= 0
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        unsafe {
+            // Note that this load is not only an assert for correctness about
+            // disconnection, but also a proper fence before the read of
+            // `to_wake`, so this assert cannot be removed with also removing
+            // the `to_wake` assert.
+            assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED);
+            assert_eq!(self.to_wake.load(atomics::SeqCst), 0);
+            assert_eq!(self.channels.load(atomics::SeqCst), 0);
+            self.select_lock.destroy();
+        }
+    }
+}