about summary refs log tree commit diff
path: root/src/libsync/comm/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libsync/comm/stream.rs')
-rw-r--r--src/libsync/comm/stream.rs486
1 files changed, 0 insertions, 486 deletions
diff --git a/src/libsync/comm/stream.rs b/src/libsync/comm/stream.rs
deleted file mode 100644
index 67878e3ba5a..00000000000
--- a/src/libsync/comm/stream.rs
+++ /dev/null
@@ -1,486 +0,0 @@
-// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-/// Stream channels
-///
-/// This is the flavor of channels which are optimized for one sender and one
-/// receiver. The sender will be upgraded to a shared channel if the channel is
-/// cloned.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module.
-
-pub use self::Failure::*;
-pub use self::UpgradeResult::*;
-pub use self::SelectionResult::*;
-use self::Message::*;
-
-use core::prelude::*;
-
-use alloc::boxed::Box;
-use core::cmp;
-use core::int;
-use rustrt::local::Local;
-use rustrt::task::{Task, BlockedTask};
-use rustrt::thread::Thread;
-
-use atomic;
-use comm::Receiver;
-use spsc_queue as spsc;
-
-const DISCONNECTED: int = int::MIN;
-#[cfg(test)]
-const MAX_STEALS: int = 5;
-#[cfg(not(test))]
-const MAX_STEALS: int = 1 << 20;
-
-pub struct Packet<T> {
-    queue: spsc::Queue<Message<T>>, // internal queue for all message
-
-    cnt: atomic::AtomicInt, // How many items are on this channel
-    steals: int, // How many times has a port received without blocking?
-    to_wake: atomic::AtomicUint, // Task to wake up
-
-    port_dropped: atomic::AtomicBool, // flag if the channel has been destroyed.
-}
-
-pub enum Failure<T> {
-    Empty,
-    Disconnected,
-    Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
-    UpSuccess,
-    UpDisconnected,
-    UpWoke(BlockedTask),
-}
-
-pub enum SelectionResult<T> {
-    SelSuccess,
-    SelCanceled(BlockedTask),
-    SelUpgraded(BlockedTask, Receiver<T>),
-}
-
-// Any message could contain an "upgrade request" to a new shared port, so the
-// internal queue it's a queue of T, but rather Message<T>
-enum Message<T> {
-    Data(T),
-    GoUp(Receiver<T>),
-}
-
-impl<T: Send> Packet<T> {
-    pub fn new() -> Packet<T> {
-        Packet {
-            queue: unsafe { spsc::Queue::new(128) },
-
-            cnt: atomic::AtomicInt::new(0),
-            steals: 0,
-            to_wake: atomic::AtomicUint::new(0),
-
-            port_dropped: atomic::AtomicBool::new(false),
-        }
-    }
-
-
-    pub fn send(&mut self, t: T) -> Result<(), T> {
-        // If the other port has deterministically gone away, then definitely
-        // must return the data back up the stack. Otherwise, the data is
-        // considered as being sent.
-        if self.port_dropped.load(atomic::SeqCst) { return Err(t) }
-
-        match self.do_send(Data(t)) {
-            UpSuccess | UpDisconnected => {},
-            UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
-        }
-        Ok(())
-    }
-    pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
-        // If the port has gone away, then there's no need to proceed any
-        // further.
-        if self.port_dropped.load(atomic::SeqCst) { return UpDisconnected }
-
-        self.do_send(GoUp(up))
-    }
-
-    fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
-        self.queue.push(t);
-        match self.cnt.fetch_add(1, atomic::SeqCst) {
-            // As described in the mod's doc comment, -1 == wakeup
-            -1 => UpWoke(self.take_to_wake()),
-            // As as described before, SPSC queues must be >= -2
-            -2 => UpSuccess,
-
-            // Be sure to preserve the disconnected state, and the return value
-            // in this case is going to be whether our data was received or not.
-            // This manifests itself on whether we have an empty queue or not.
-            //
-            // Primarily, are required to drain the queue here because the port
-            // will never remove this data. We can only have at most one item to
-            // drain (the port drains the rest).
-            DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, atomic::SeqCst);
-                let first = self.queue.pop();
-                let second = self.queue.pop();
-                assert!(second.is_none());
-
-                match first {
-                    Some(..) => UpSuccess,  // we failed to send the data
-                    None => UpDisconnected, // we successfully sent data
-                }
-            }
-
-            // Otherwise we just sent some data on a non-waiting queue, so just
-            // make sure the world is sane and carry on!
-            n => { assert!(n >= 0); UpSuccess }
-        }
-    }
-
-    // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&mut self) -> BlockedTask {
-        let task = self.to_wake.load(atomic::SeqCst);
-        self.to_wake.store(0, atomic::SeqCst);
-        assert!(task != 0);
-        unsafe { BlockedTask::cast_from_uint(task) }
-    }
-
-    // Decrements the count on the channel for a sleeper, returning the sleeper
-    // back if it shouldn't sleep. Note that this is the location where we take
-    // steals into account.
-    fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> {
-        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-        let n = unsafe { task.cast_to_uint() };
-        self.to_wake.store(n, atomic::SeqCst);
-
-        let steals = self.steals;
-        self.steals = 0;
-
-        match self.cnt.fetch_sub(1 + steals, atomic::SeqCst) {
-            DISCONNECTED => { self.cnt.store(DISCONNECTED, atomic::SeqCst); }
-            // If we factor in our steals and notice that the channel has no
-            // data, we successfully sleep
-            n => {
-                assert!(n >= 0);
-                if n - steals <= 0 { return Ok(()) }
-            }
-        }
-
-        self.to_wake.store(0, atomic::SeqCst);
-        Err(unsafe { BlockedTask::cast_from_uint(n) })
-    }
-
-    pub fn recv(&mut self) -> Result<T, Failure<T>> {
-        // Optimistic preflight check (scheduling is expensive).
-        match self.try_recv() {
-            Err(Empty) => {}
-            data => return data,
-        }
-
-        // Welp, our channel has no data. Deschedule the current task and
-        // initiate the blocking protocol.
-        let task: Box<Task> = Local::take();
-        task.deschedule(1, |task| {
-            self.decrement(task)
-        });
-
-        match self.try_recv() {
-            // Messages which actually popped from the queue shouldn't count as
-            // a steal, so offset the decrement here (we already have our
-            // "steal" factored into the channel count above).
-            data @ Ok(..) |
-            data @ Err(Upgraded(..)) => {
-                self.steals -= 1;
-                data
-            }
-
-            data => data,
-        }
-    }
-
-    pub fn try_recv(&mut self) -> Result<T, Failure<T>> {
-        match self.queue.pop() {
-            // If we stole some data, record to that effect (this will be
-            // factored into cnt later on).
-            //
-            // Note that we don't allow steals to grow without bound in order to
-            // prevent eventual overflow of either steals or cnt as an overflow
-            // would have catastrophic results. Sometimes, steals > cnt, but
-            // other times cnt > steals, so we don't know the relation between
-            // steals and cnt. This code path is executed only rarely, so we do
-            // a pretty slow operation, of swapping 0 into cnt, taking steals
-            // down as much as possible (without going negative), and then
-            // adding back in whatever we couldn't factor into steals.
-            Some(data) => {
-                if self.steals > MAX_STEALS {
-                    match self.cnt.swap(0, atomic::SeqCst) {
-                        DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, atomic::SeqCst);
-                        }
-                        n => {
-                            let m = cmp::min(n, self.steals);
-                            self.steals -= m;
-                            self.bump(n - m);
-                        }
-                    }
-                    assert!(self.steals >= 0);
-                }
-                self.steals += 1;
-                match data {
-                    Data(t) => Ok(t),
-                    GoUp(up) => Err(Upgraded(up)),
-                }
-            }
-
-            None => {
-                match self.cnt.load(atomic::SeqCst) {
-                    n if n != DISCONNECTED => Err(Empty),
-
-                    // This is a little bit of a tricky case. We failed to pop
-                    // data above, and then we have viewed that the channel is
-                    // disconnected. In this window more data could have been
-                    // sent on the channel. It doesn't really make sense to
-                    // return that the channel is disconnected when there's
-                    // actually data on it, so be extra sure there's no data by
-                    // popping one more time.
-                    //
-                    // We can ignore steals because the other end is
-                    // disconnected and we'll never need to really factor in our
-                    // steals again.
-                    _ => {
-                        match self.queue.pop() {
-                            Some(Data(t)) => Ok(t),
-                            Some(GoUp(up)) => Err(Upgraded(up)),
-                            None => Err(Disconnected),
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    pub fn drop_chan(&mut self) {
-        // Dropping a channel is pretty simple, we just flag it as disconnected
-        // and then wakeup a blocker if there is one.
-        match self.cnt.swap(DISCONNECTED, atomic::SeqCst) {
-            -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); }
-            DISCONNECTED => {}
-            n => { assert!(n >= 0); }
-        }
-    }
-
-    pub fn drop_port(&mut self) {
-        // Dropping a port seems like a fairly trivial thing. In theory all we
-        // need to do is flag that we're disconnected and then everything else
-        // can take over (we don't have anyone to wake up).
-        //
-        // The catch for Ports is that we want to drop the entire contents of
-        // the queue. There are multiple reasons for having this property, the
-        // largest of which is that if another chan is waiting in this channel
-        // (but not received yet), then waiting on that port will cause a
-        // deadlock.
-        //
-        // So if we accept that we must now destroy the entire contents of the
-        // queue, this code may make a bit more sense. The tricky part is that
-        // we can't let any in-flight sends go un-dropped, we have to make sure
-        // *everything* is dropped and nothing new will come onto the channel.
-
-        // The first thing we do is set a flag saying that we're done for. All
-        // sends are gated on this flag, so we're immediately guaranteed that
-        // there are a bounded number of active sends that we'll have to deal
-        // with.
-        self.port_dropped.store(true, atomic::SeqCst);
-
-        // Now that we're guaranteed to deal with a bounded number of senders,
-        // we need to drain the queue. This draining process happens atomically
-        // with respect to the "count" of the channel. If the count is nonzero
-        // (with steals taken into account), then there must be data on the
-        // channel. In this case we drain everything and then try again. We will
-        // continue to fail while active senders send data while we're dropping
-        // data, but eventually we're guaranteed to break out of this loop
-        // (because there is a bounded number of senders).
-        let mut steals = self.steals;
-        while {
-            let cnt = self.cnt.compare_and_swap(
-                            steals, DISCONNECTED, atomic::SeqCst);
-            cnt != DISCONNECTED && cnt != steals
-        } {
-            loop {
-                match self.queue.pop() {
-                    Some(..) => { steals += 1; }
-                    None => break
-                }
-            }
-        }
-
-        // At this point in time, we have gated all future senders from sending,
-        // and we have flagged the channel as being disconnected. The senders
-        // still have some responsibility, however, because some sends may not
-        // complete until after we flag the disconnection. There are more
-        // details in the sending methods that see DISCONNECTED
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // Tests to see whether this port can receive without blocking. If Ok is
-    // returned, then that's the answer. If Err is returned, then the returned
-    // port needs to be queried instead (an upgrade happened)
-    pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> {
-        // We peek at the queue to see if there's anything on it, and we use
-        // this return value to determine if we should pop from the queue and
-        // upgrade this channel immediately. If it looks like we've got an
-        // upgrade pending, then go through the whole recv rigamarole to update
-        // the internal state.
-        match self.queue.peek() {
-            Some(&GoUp(..)) => {
-                match self.recv() {
-                    Err(Upgraded(port)) => Err(port),
-                    _ => unreachable!(),
-                }
-            }
-            Some(..) => Ok(true),
-            None => Ok(false)
-        }
-    }
-
-    // increment the count on the channel (used for selection)
-    fn bump(&mut self, amt: int) -> int {
-        match self.cnt.fetch_add(amt, atomic::SeqCst) {
-            DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, atomic::SeqCst);
-                DISCONNECTED
-            }
-            n => n
-        }
-    }
-
-    // Attempts to start selecting on this port. Like a oneshot, this can fail
-    // immediately because of an upgrade.
-    pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult<T> {
-        match self.decrement(task) {
-            Ok(()) => SelSuccess,
-            Err(task) => {
-                let ret = match self.queue.peek() {
-                    Some(&GoUp(..)) => {
-                        match self.queue.pop() {
-                            Some(GoUp(port)) => SelUpgraded(task, port),
-                            _ => unreachable!(),
-                        }
-                    }
-                    Some(..) => SelCanceled(task),
-                    None => SelCanceled(task),
-                };
-                // Undo our decrement above, and we should be guaranteed that the
-                // previous value is positive because we're not going to sleep
-                let prev = self.bump(1);
-                assert!(prev == DISCONNECTED || prev >= 0);
-                return ret;
-            }
-        }
-    }
-
-    // Removes a previous task from being blocked in this port
-    pub fn abort_selection(&mut self,
-                           was_upgrade: bool) -> Result<bool, Receiver<T>> {
-        // If we're aborting selection after upgrading from a oneshot, then
-        // we're guarantee that no one is waiting. The only way that we could
-        // have seen the upgrade is if data was actually sent on the channel
-        // half again. For us, this means that there is guaranteed to be data on
-        // this channel. Furthermore, we're guaranteed that there was no
-        // start_selection previously, so there's no need to modify `self.cnt`
-        // at all.
-        //
-        // Hence, because of these invariants, we immediately return `Ok(true)`.
-        // Note that the data may not actually be sent on the channel just yet.
-        // The other end could have flagged the upgrade but not sent data to
-        // this end. This is fine because we know it's a small bounded windows
-        // of time until the data is actually sent.
-        if was_upgrade {
-            assert_eq!(self.steals, 0);
-            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-            return Ok(true)
-        }
-
-        // We want to make sure that the count on the channel goes non-negative,
-        // and in the stream case we can have at most one steal, so just assume
-        // that we had one steal.
-        let steals = 1;
-        let prev = self.bump(steals + 1);
-
-        // If we were previously disconnected, then we know for sure that there
-        // is no task in to_wake, so just keep going
-        let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-            true // there is data, that data is that we're disconnected
-        } else {
-            let cur = prev + steals + 1;
-            assert!(cur >= 0);
-
-            // If the previous count was negative, then we just made things go
-            // positive, hence we passed the -1 boundary and we're responsible
-            // for removing the to_wake() field and trashing it.
-            //
-            // If the previous count was positive then we're in a tougher
-            // situation. A possible race is that a sender just incremented
-            // through -1 (meaning it's going to try to wake a task up), but it
-            // hasn't yet read the to_wake. In order to prevent a future recv()
-            // from waking up too early (this sender picking up the plastered
-            // over to_wake), we spin loop here waiting for to_wake to be 0.
-            // Note that this entire select() implementation needs an overhaul,
-            // and this is *not* the worst part of it, so this is not done as a
-            // final solution but rather out of necessity for now to get
-            // something working.
-            if prev < 0 {
-                self.take_to_wake().trash();
-            } else {
-                while self.to_wake.load(atomic::SeqCst) != 0 {
-                    Thread::yield_now();
-                }
-            }
-            assert_eq!(self.steals, 0);
-            self.steals = steals;
-
-            // if we were previously positive, then there's surely data to
-            // receive
-            prev >= 0
-        };
-
-        // Now that we've determined that this queue "has data", we peek at the
-        // queue to see if the data is an upgrade or not. If it's an upgrade,
-        // then we need to destroy this port and abort selection on the
-        // upgraded port.
-        if has_data {
-            match self.queue.peek() {
-                Some(&GoUp(..)) => {
-                    match self.queue.pop() {
-                        Some(GoUp(port)) => Err(port),
-                        _ => unreachable!(),
-                    }
-                }
-                _ => Ok(true),
-            }
-        } else {
-            Ok(false)
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Packet<T> {
-    fn drop(&mut self) {
-        // Note that this load is not only an assert for correctness about
-        // disconnection, but also a proper fence before the read of
-        // `to_wake`, so this assert cannot be removed with also removing
-        // the `to_wake` assert.
-        assert_eq!(self.cnt.load(atomic::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(atomic::SeqCst), 0);
-    }
-}