about summary refs log tree commit diff
path: root/src/libstd/sync/mpsc/sync.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-12-23 11:53:35 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-12-29 12:16:49 -0800
commitbc83a009f655dd3896be4a7cd33cac8032a605f2 (patch)
tree3acc8533031219690fe14fa56f4427cfa9297296 /src/libstd/sync/mpsc/sync.rs
parentbb8f4fc3b73918abd19d67be702f78e8f73d1874 (diff)
downloadrust-bc83a009f655dd3896be4a7cd33cac8032a605f2.tar.gz
rust-bc83a009f655dd3896be4a7cd33cac8032a605f2.zip
std: Second pass stabilization for `comm`
This commit is a second pass stabilization for the `std::comm` module,
performing the following actions:

* The entire `std::comm` module was moved under `std::sync::mpsc`. This movement
  reflects that channels are just yet another synchronization primitive, and
  they don't necessarily deserve a special place outside of the other
  concurrency primitives that the standard library offers.
* The `send` and `recv` methods have all been removed.
* The `send_opt` and `recv_opt` methods have been renamed to `send` and `recv`.
  This means that all send/receive operations return a `Result` now indicating
  whether the operation was successful or not.
* The error type of `send` is now a `SendError` to implement a custom error
  message and allow for `unwrap()`. The error type contains an `into_inner`
  method to extract the value.
* The error type of `recv` is now `RecvError` for the same reasons as `send`.
* The `TryRecvError` and `TrySendError` types have had public reexports removed
  of their variants and the variant names have been tweaked with enum
  namespacing rules.
* The `Messages` iterator is renamed to `Iter`

This functionality is now all `#[stable]`:

* `Sender`
* `SyncSender`
* `Receiver`
* `std::sync::mpsc`
* `channel`
* `sync_channel`
* `Iter`
* `Sender::send`
* `Sender::clone`
* `SyncSender::send`
* `SyncSender::try_send`
* `SyncSender::clone`
* `Receiver::recv`
* `Receiver::try_recv`
* `Receiver::iter`
* `SendError`
* `RecvError`
* `TrySendError::{mod, Full, Disconnected}`
* `TryRecvError::{mod, Empty, Disconnected}`
* `SendError::into_inner`
* `TrySendError::into_inner`

This is a breaking change due to the modification of where this module is
located, as well as the changing of the semantics of `send` and `recv`. Most
programs just need to rename imports of `std::comm` to `std::sync::mpsc` and
add calls to `unwrap` after a send or a receive operation.

[breaking-change]
Diffstat (limited to 'src/libstd/sync/mpsc/sync.rs')
-rw-r--r--src/libstd/sync/mpsc/sync.rs483
1 files changed, 483 insertions, 0 deletions
diff --git a/src/libstd/sync/mpsc/sync.rs b/src/libstd/sync/mpsc/sync.rs
new file mode 100644
index 00000000000..28005831d4f
--- /dev/null
+++ b/src/libstd/sync/mpsc/sync.rs
@@ -0,0 +1,483 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Synchronous channels/ports
+///
+/// This channel implementation differs significantly from the asynchronous
+/// implementations found next to it (oneshot/stream/share). This is an
+/// implementation of a synchronous, bounded buffer channel.
+///
+/// Each channel is created with some amount of backing buffer, and sends will
+/// *block* until buffer space becomes available. A buffer size of 0 is valid,
+/// which means that every successful send is paired with a successful recv.
+///
+/// This flavor of channels defines a new `send_opt` method for channels which
+/// is the method by which a message is sent but the task does not panic if it
+/// cannot be delivered.
+///
+/// Another major difference is that send() will *always* return back the data
+/// if it couldn't be sent. This is because it is deterministically known when
+/// the data is received and when it is not received.
+///
+/// Implementation-wise, it can all be summed up with "use a mutex plus some
+/// logic". The mutex used here is an OS native mutex, meaning that no user code
+/// is run inside of the mutex (to prevent context switching). This
+/// implementation shares almost all code for the buffered and unbuffered cases
+/// of a synchronous channel. There are a few branches for the unbuffered case,
+/// but they're mostly just relevant to blocking senders.
+
+use core::prelude::*;
+
+pub use self::Failure::*;
+use self::Blocker::*;
+
+use vec::Vec;
+use core::mem;
+
+use sync::{atomic, Mutex, MutexGuard};
+use sync::mpsc::blocking::{mod, WaitToken, SignalToken};
+use sync::mpsc::select::StartResult::{mod, Installed, Abort};
+
+pub struct Packet<T> {
+    /// Only field outside of the mutex. Just done for kicks, but mainly because
+    /// the other shared channel already had the code implemented
+    channels: atomic::AtomicUint,
+
+    lock: Mutex<State<T>>,
+}
+
+unsafe impl<T:Send> Send for Packet<T> { }
+
+unsafe impl<T:Send> Sync for Packet<T> { }
+
+struct State<T> {
+    disconnected: bool, // Is the channel disconnected yet?
+    queue: Queue,       // queue of senders waiting to send data
+    blocker: Blocker,   // currently blocked task on this channel
+    buf: Buffer<T>,     // storage for buffered messages
+    cap: uint,          // capacity of this channel
+
+    /// A curious flag used to indicate whether a sender failed or succeeded in
+    /// blocking. This is used to transmit information back to the task that it
+    /// must dequeue its message from the buffer because it was not received.
+    /// This is only relevant in the 0-buffer case. This obviously cannot be
+    /// safely constructed, but it's guaranteed to always have a valid pointer
+    /// value.
+    canceled: Option<&'static mut bool>,
+}
+
+unsafe impl<T: Send> Send for State<T> {}
+
+/// Possible flavors of threads who can be blocked on this channel.
+enum Blocker {
+    BlockedSender(SignalToken),
+    BlockedReceiver(SignalToken),
+    NoneBlocked
+}
+
+/// Simple queue for threading tasks together. Nodes are stack-allocated, so
+/// this structure is not safe at all
+struct Queue {
+    head: *mut Node,
+    tail: *mut Node,
+}
+
+struct Node {
+    token: Option<SignalToken>,
+    next: *mut Node,
+}
+
+unsafe impl Send for Node {}
+
+/// A simple ring-buffer
+struct Buffer<T> {
+    buf: Vec<Option<T>>,
+    start: uint,
+    size: uint,
+}
+
+#[deriving(Show)]
+pub enum Failure {
+    Empty,
+    Disconnected,
+}
+
+/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
+/// in the meantime. This re-locks the mutex upon returning.
+fn wait<'a, 'b, T: Send>(lock: &'a Mutex<State<T>>,
+                         mut guard: MutexGuard<'b, State<T>>,
+                         f: fn(SignalToken) -> Blocker)
+                         -> MutexGuard<'a, State<T>>
+{
+    let (wait_token, signal_token) = blocking::tokens();
+    match mem::replace(&mut guard.blocker, f(signal_token)) {
+        NoneBlocked => {}
+        _ => unreachable!(),
+    }
+    drop(guard);        // unlock
+    wait_token.wait();  // block
+    lock.lock()         // relock
+}
+
+/// Wakes up a thread, dropping the lock at the correct time
+fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
+    // We need to be careful to wake up the waiting task *outside* of the mutex
+    // in case it incurs a context switch.
+    drop(guard);
+    token.signal();
+}
+
+impl<T: Send> Packet<T> {
+    pub fn new(cap: uint) -> Packet<T> {
+        Packet {
+            channels: atomic::AtomicUint::new(1),
+            lock: Mutex::new(State {
+                disconnected: false,
+                blocker: NoneBlocked,
+                cap: cap,
+                canceled: None,
+                queue: Queue {
+                    head: 0 as *mut Node,
+                    tail: 0 as *mut Node,
+                },
+                buf: Buffer {
+                    buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
+                    start: 0,
+                    size: 0,
+                },
+            }),
+        }
+    }
+
+    // wait until a send slot is available, returning locked access to
+    // the channel state.
+    fn acquire_send_slot(&self) -> MutexGuard<State<T>> {
+        let mut node = Node { token: None, next: 0 as *mut Node };
+        loop {
+            let mut guard = self.lock.lock();
+            // are we ready to go?
+            if guard.disconnected || guard.buf.size() < guard.buf.cap() {
+                return guard;
+            }
+            // no room; actually block
+            let wait_token = guard.queue.enqueue(&mut node);
+            drop(guard);
+            wait_token.wait();
+        }
+    }
+
+    pub fn send(&self, t: T) -> Result<(), T> {
+        let mut guard = self.acquire_send_slot();
+        if guard.disconnected { return Err(t) }
+        guard.buf.enqueue(t);
+
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
+            // if our capacity is 0, then we need to wait for a receiver to be
+            // available to take our data. After waiting, we check again to make
+            // sure the port didn't go away in the meantime. If it did, we need
+            // to hand back our data.
+            NoneBlocked if guard.cap == 0 => {
+                let mut canceled = false;
+                assert!(guard.canceled.is_none());
+                guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
+                let mut guard = wait(&self.lock, guard, BlockedSender);
+                if canceled {Err(guard.buf.dequeue())} else {Ok(())}
+            }
+
+            // success, we buffered some data
+            NoneBlocked => Ok(()),
+
+            // success, someone's about to receive our buffered data.
+            BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
+
+            BlockedSender(..) => panic!("lolwut"),
+        }
+    }
+
+    pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
+        let mut guard = self.lock.lock();
+        if guard.disconnected {
+            Err(super::TrySendError::Disconnected(t))
+        } else if guard.buf.size() == guard.buf.cap() {
+            Err(super::TrySendError::Full(t))
+        } else if guard.cap == 0 {
+            // With capacity 0, even though we have buffer space we can't
+            // transfer the data unless there's a receiver waiting.
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
+                NoneBlocked => Err(super::TrySendError::Full(t)),
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(token) => {
+                    guard.buf.enqueue(t);
+                    wakeup(token, guard);
+                    Ok(())
+                }
+            }
+        } else {
+            // If the buffer has some space and the capacity isn't 0, then we
+            // just enqueue the data for later retrieval, ensuring to wake up
+            // any blocked receiver if there is one.
+            assert!(guard.buf.size() < guard.buf.cap());
+            guard.buf.enqueue(t);
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
+                BlockedReceiver(token) => wakeup(token, guard),
+                NoneBlocked => {}
+                BlockedSender(..) => unreachable!(),
+            }
+            Ok(())
+        }
+    }
+
+    // Receives a message from this channel
+    //
+    // When reading this, remember that there can only ever be one receiver at
+    // time.
+    pub fn recv(&self) -> Result<T, ()> {
+        let mut guard = self.lock.lock();
+
+        // Wait for the buffer to have something in it. No need for a while loop
+        // because we're the only receiver.
+        let mut waited = false;
+        if !guard.disconnected && guard.buf.size() == 0 {
+            guard = wait(&self.lock, guard, BlockedReceiver);
+            waited = true;
+        }
+        if guard.disconnected && guard.buf.size() == 0 { return Err(()) }
+
+        // Pick up the data, wake up our neighbors, and carry on
+        assert!(guard.buf.size() > 0);
+        let ret = guard.buf.dequeue();
+        self.wakeup_senders(waited, guard);
+        return Ok(ret);
+    }
+
+    pub fn try_recv(&self) -> Result<T, Failure> {
+        let mut guard = self.lock.lock();
+
+        // Easy cases first
+        if guard.disconnected { return Err(Disconnected) }
+        if guard.buf.size() == 0 { return Err(Empty) }
+
+        // Be sure to wake up neighbors
+        let ret = Ok(guard.buf.dequeue());
+        self.wakeup_senders(false, guard);
+
+        return ret;
+    }
+
+    // Wake up pending senders after some data has been received
+    //
+    // * `waited` - flag if the receiver blocked to receive some data, or if it
+    //              just picked up some data on the way out
+    // * `guard` - the lock guard that is held over this channel's lock
+    fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) {
+        let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
+
+        // If this is a no-buffer channel (cap == 0), then if we didn't wait we
+        // need to ACK the sender. If we waited, then the sender waking us up
+        // was already the ACK.
+        let pending_sender2 = if guard.cap == 0 && !waited {
+            match mem::replace(&mut guard.blocker, NoneBlocked) {
+                NoneBlocked => None,
+                BlockedReceiver(..) => unreachable!(),
+                BlockedSender(token) => {
+                    guard.canceled.take();
+                    Some(token)
+                }
+            }
+        } else {
+            None
+        };
+        mem::drop(guard);
+
+        // only outside of the lock do we wake up the pending tasks
+        pending_sender1.map(|t| t.signal());
+        pending_sender2.map(|t| t.signal());
+    }
+
+    // Prepares this shared packet for a channel clone, essentially just bumping
+    // a refcount.
+    pub fn clone_chan(&self) {
+        self.channels.fetch_add(1, atomic::SeqCst);
+    }
+
+    pub fn drop_chan(&self) {
+        // Only flag the channel as disconnected if we're the last channel
+        match self.channels.fetch_sub(1, atomic::SeqCst) {
+            1 => {}
+            _ => return
+        }
+
+        // Not much to do other than wake up a receiver if one's there
+        let mut guard = self.lock.lock();
+        if guard.disconnected { return }
+        guard.disconnected = true;
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
+            NoneBlocked => {}
+            BlockedSender(..) => unreachable!(),
+            BlockedReceiver(token) => wakeup(token, guard),
+        }
+    }
+
+    pub fn drop_port(&self) {
+        let mut guard = self.lock.lock();
+
+        if guard.disconnected { return }
+        guard.disconnected = true;
+
+        // If the capacity is 0, then the sender may want its data back after
+        // we're disconnected. Otherwise it's now our responsibility to destroy
+        // the buffered data. As with many other portions of this code, this
+        // needs to be careful to destroy the data *outside* of the lock to
+        // prevent deadlock.
+        let _data = if guard.cap != 0 {
+            mem::replace(&mut guard.buf.buf, Vec::new())
+        } else {
+            Vec::new()
+        };
+        let mut queue = mem::replace(&mut guard.queue, Queue {
+            head: 0 as *mut Node,
+            tail: 0 as *mut Node,
+        });
+
+        let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
+            NoneBlocked => None,
+            BlockedSender(token) => {
+                *guard.canceled.take().unwrap() = true;
+                Some(token)
+            }
+            BlockedReceiver(..) => unreachable!(),
+        };
+        mem::drop(guard);
+
+        loop {
+            match queue.dequeue() {
+                Some(token) => { token.signal(); }
+                None => break,
+            }
+        }
+        waiter.map(|t| t.signal());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // select implementation
+    ////////////////////////////////////////////////////////////////////////////
+
+    // If Ok, the value is whether this port has data, if Err, then the upgraded
+    // port needs to be checked instead of this one.
+    pub fn can_recv(&self) -> bool {
+        let guard = self.lock.lock();
+        guard.disconnected || guard.buf.size() > 0
+    }
+
+    // Attempts to start selection on this port. This can either succeed or fail
+    // because there is data waiting.
+    pub fn start_selection(&self, token: SignalToken) -> StartResult {
+        let mut guard = self.lock.lock();
+        if guard.disconnected || guard.buf.size() > 0 {
+            Abort
+        } else {
+            match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
+                NoneBlocked => {}
+                BlockedSender(..) => unreachable!(),
+                BlockedReceiver(..) => unreachable!(),
+            }
+            Installed
+        }
+    }
+
+    // Remove a previous selecting task from this port. This ensures that the
+    // blocked task will no longer be visible to any other threads.
+    //
+    // The return value indicates whether there's data on this port.
+    pub fn abort_selection(&self) -> bool {
+        let mut guard = self.lock.lock();
+        match mem::replace(&mut guard.blocker, NoneBlocked) {
+            NoneBlocked => true,
+            BlockedSender(token) => {
+                guard.blocker = BlockedSender(token);
+                true
+            }
+            BlockedReceiver(token) => { drop(token); false }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Packet<T> {
+    fn drop(&mut self) {
+        assert_eq!(self.channels.load(atomic::SeqCst), 0);
+        let mut guard = self.lock.lock();
+        assert!(guard.queue.dequeue().is_none());
+        assert!(guard.canceled.is_none());
+    }
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Buffer, a simple ring buffer backed by Vec<T>
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Buffer<T> {
+    fn enqueue(&mut self, t: T) {
+        let pos = (self.start + self.size) % self.buf.len();
+        self.size += 1;
+        let prev = mem::replace(&mut self.buf[pos], Some(t));
+        assert!(prev.is_none());
+    }
+
+    fn dequeue(&mut self) -> T {
+        let start = self.start;
+        self.size -= 1;
+        self.start = (self.start + 1) % self.buf.len();
+        self.buf[start].take().unwrap()
+    }
+
+    fn size(&self) -> uint { self.size }
+    fn cap(&self) -> uint { self.buf.len() }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
+////////////////////////////////////////////////////////////////////////////////
+
+impl Queue {
+    fn enqueue(&mut self, node: &mut Node) -> WaitToken {
+        let (wait_token, signal_token) = blocking::tokens();
+        node.token = Some(signal_token);
+        node.next = 0 as *mut Node;
+
+        if self.tail.is_null() {
+            self.head = node as *mut Node;
+            self.tail = node as *mut Node;
+        } else {
+            unsafe {
+                (*self.tail).next = node as *mut Node;
+                self.tail = node as *mut Node;
+            }
+        }
+
+        wait_token
+    }
+
+    fn dequeue(&mut self) -> Option<SignalToken> {
+        if self.head.is_null() {
+            return None
+        }
+        let node = self.head;
+        self.head = unsafe { (*node).next };
+        if self.head.is_null() {
+            self.tail = 0 as *mut Node;
+        }
+        unsafe {
+            (*node).next = 0 as *mut Node;
+            Some((*node).token.take().unwrap())
+        }
+    }
+}