about summary refs log tree commit diff
path: root/src/libstd/comm/select.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-05 17:56:17 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-16 17:47:11 -0800
commitbfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch)
treeb10aeff181eff3a8654df495d2ad8826490f6533 /src/libstd/comm/select.rs
parent000cda611f8224ac780fa37432f869f425cd2bb7 (diff)
downloadrust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz
rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
    * Based on a single-producer single-consumer lock-free queue that doesn't
      always have to allocate on every send.
    * Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
    * try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
    * try_recv == never block
    * recv_opt == block, don't fail
    * iter() == Iterator<T> for Port<T>
    * removed peek
    * Type::new
* Removed rt::comm
Diffstat (limited to 'src/libstd/comm/select.rs')
-rw-r--r--src/libstd/comm/select.rs498
1 files changed, 498 insertions, 0 deletions
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs
new file mode 100644
index 00000000000..81a77000bad
--- /dev/null
+++ b/src/libstd/comm/select.rs
@@ -0,0 +1,498 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Selection over an array of ports
+//!
+//! This module contains the implementation machinery necessary for selecting
+//! over a number of ports. One large goal of this module is to provide an
+//! efficient interface to selecting over any port of any type.
+//!
+//! This is achieved through an architecture of a "port set" in which ports are
+//! added to a set and then the entire set is waited on at once. The set can be
+//! waited on multiple times to prevent re-adding each port to the set.
+//!
+//! Usage of this module is currently encouraged to go through the use of the
+//! `select!` macro. This macro allows naturally binding of variables to the
+//! received values of ports in a much more natural syntax then usage of the
+//! `Select` structure directly.
+//!
+//! # Example
+//!
+//! ```rust
+//! let (mut p1, c1) = Chan::new();
+//! let (mut p2, c2) = Chan::new();
+//!
+//! c1.send(1);
+//! c2.send(2);
+//!
+//! select! (
+//!     val = p1.recv() => {
+//!         assert_eq!(val, 1);
+//!     }
+//!     val = p2.recv() => {
+//!         assert_eq!(val, 2);
+//!     }
+//! )
+
+use cast;
+use iter::Iterator;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use ptr::RawPtr;
+use super::imp::BlockingContext;
+use super::{Packet, Port, imp};
+use uint;
+use unstable::atomics::{Relaxed, SeqCst};
+
+macro_rules! select {
+    (
+        $name1:pat = $port1:ident.$meth1:ident() => $code1:expr,
+        $($name:pat = $port:ident.$meth:ident() => $code:expr),*
+    ) => ({
+        use std::comm::Select;
+        let sel = Select::new();
+        let mut $port1 = sel.add(&mut $port1);
+        $( let mut $port = sel.add(&mut $port); )*
+        let ret = sel.wait();
+        if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 }
+        $( else if ret == $port.id { let $name = $port.$meth(); $code } )*
+        else { unreachable!() }
+    })
+}
+
+/// The "port set" of the select interface. This structure is used to manage a
+/// set of ports which are being selected over.
+#[no_freeze]
+#[no_send]
+pub struct Select {
+    priv head: *mut Packet,
+    priv tail: *mut Packet,
+    priv next_id: uint,
+}
+
+/// A handle to a port which is currently a member of a `Select` set of ports.
+/// This handle is used to keep the port in the set as well as interact with the
+/// underlying port.
+pub struct Handle<'self, T> {
+    id: uint,
+    priv selector: &'self Select,
+    priv port: &'self mut Port<T>,
+}
+
+struct PacketIterator { priv cur: *mut Packet }
+
+impl Select {
+    /// Creates a new selection structure. This set is initially empty and
+    /// `wait` will fail!() if called.
+    ///
+    /// Usage of this struct directly can sometimes be burdensome, and usage is
+    /// rather much easier through the `select!` macro.
+    pub fn new() -> Select {
+        Select {
+            head: 0 as *mut Packet,
+            tail: 0 as *mut Packet,
+            next_id: 1,
+        }
+    }
+
+    /// Adds a new port to this set, returning a handle which is then used to
+    /// receive on the port.
+    ///
+    /// Note that this port parameter takes `&mut Port` instead of `&Port`. None
+    /// of the methods of receiving on a port require `&mut self`, but `&mut` is
+    /// used here in order to have the compiler guarantee that the same port is
+    /// not added to this set more than once.
+    ///
+    /// When the returned handle falls out of scope, the port will be removed
+    /// from this set. While the handle is in this set, usage of the port can be
+    /// done through the `Handle`'s receiving methods.
+    pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> {
+        let this = unsafe { cast::transmute_mut(self) };
+        let id = this.next_id;
+        this.next_id += 1;
+        unsafe {
+            let packet = port.queue.packet();
+            assert!(!(*packet).selecting.load(Relaxed));
+            assert_eq!((*packet).selection_id, 0);
+            (*packet).selection_id = id;
+            if this.head.is_null() {
+                this.head = packet;
+                this.tail = packet;
+            } else {
+                (*packet).select_prev = this.tail;
+                assert!((*packet).select_next.is_null());
+                (*this.tail).select_next = packet;
+                this.tail = packet;
+            }
+        }
+        Handle { id: id, selector: this, port: port }
+    }
+
+    /// Waits for an event on this port set. The returned valus is *not* and
+    /// index, but rather an id. This id can be queried against any active
+    /// `Handle` structures (each one has a public `id` field). The handle with
+    /// the matching `id` will have some sort of event available on it. The
+    /// event could either be that data is available or the corresponding
+    /// channel has been closed.
+    pub fn wait(&self) -> uint {
+        // Note that this is currently an inefficient implementation. We in
+        // theory have knowledge about all ports in the set ahead of time, so
+        // this method shouldn't really have to iterate over all of them yet
+        // again. The idea with this "port set" interface is to get the
+        // interface right this time around, and later this implementation can
+        // be optimized.
+        //
+        // This implementation can be summarized by:
+        //
+        //      fn select(ports) {
+        //          if any port ready { return ready index }
+        //          deschedule {
+        //              block on all ports
+        //          }
+        //          unblock on all ports
+        //          return ready index
+        //      }
+        //
+        // Most notably, the iterations over all of the ports shouldn't be
+        // necessary.
+        unsafe {
+            let mut amt = 0;
+            for p in self.iter() {
+                assert!(!(*p).selecting.load(Relaxed));
+                amt += 1;
+                if (*p).can_recv() {
+                    return (*p).selection_id;
+                }
+            }
+            assert!(amt > 0);
+
+            let mut ready_index = amt;
+            let mut ready_id = uint::max_value;
+            let mut iter = self.iter().enumerate();
+
+            // Acquire a number of blocking contexts, and block on each one
+            // sequentially until one fails. If one fails, then abort
+            // immediately so we can go unblock on all the other ports.
+            BlockingContext::many(amt, |ctx| {
+                let (i, packet) = iter.next().unwrap();
+                (*packet).selecting.store(true, SeqCst);
+                if !ctx.block(&mut (*packet).data,
+                              &mut (*packet).to_wake,
+                              || (*packet).decrement()) {
+                    (*packet).abort_selection(false);
+                    (*packet).selecting.store(false, SeqCst);
+                    ready_index = i;
+                    ready_id = (*packet).selection_id;
+                    false
+                } else {
+                    true
+                }
+            });
+
+            // Abort the selection process on each port. If the abort process
+            // returns `true`, then that means that the port is ready to receive
+            // some data. Note that this also means that the port may have yet
+            // to have fully read the `to_wake` field and woken us up (although
+            // the wakeup is guaranteed to fail).
+            //
+            // This situation happens in the window of where a sender invokes
+            // increment(), sees -1, and then decides to wake up the task. After
+            // all this is done, the sending thread will set `selecting` to
+            // `false`. Until this is done, we cannot return. If we were to
+            // return, then a sender could wake up a port which has gone back to
+            // sleep after this call to `select`.
+            //
+            // Note that it is a "fairly small window" in which an increment()
+            // views that it should wake a thread up until the `selecting` bit
+            // is set to false. For now, the implementation currently just spins
+            // in a yield loop. This is very distasteful, but this
+            // implementation is already nowhere near what it should ideally be.
+            // A rewrite should focus on avoiding a yield loop, and for now this
+            // implementation is tying us over to a more efficient "don't
+            // iterate over everything every time" implementation.
+            for packet in self.iter().take(ready_index) {
+                if (*packet).abort_selection(true) {
+                    ready_id = (*packet).selection_id;
+                    while (*packet).selecting.load(Relaxed) {
+                        imp::yield_now();
+                    }
+                }
+            }
+
+            // Sanity check for now to make sure that everyone is turned off.
+            for packet in self.iter() {
+                assert!(!(*packet).selecting.load(Relaxed));
+            }
+
+            return ready_id;
+        }
+    }
+
+    unsafe fn remove(&self, packet: *mut Packet) {
+        let this = cast::transmute_mut(self);
+        assert!(!(*packet).selecting.load(Relaxed));
+        if (*packet).select_prev.is_null() {
+            assert_eq!(packet, this.head);
+            this.head = (*packet).select_next;
+        } else {
+            (*(*packet).select_prev).select_next = (*packet).select_next;
+        }
+        if (*packet).select_next.is_null() {
+            assert_eq!(packet, this.tail);
+            this.tail = (*packet).select_prev;
+        } else {
+            (*(*packet).select_next).select_prev = (*packet).select_prev;
+        }
+        (*packet).select_next = 0 as *mut Packet;
+        (*packet).select_prev = 0 as *mut Packet;
+        (*packet).selection_id = 0;
+    }
+
+    fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } }
+}
+
+impl<'self, T: Send> Handle<'self, T> {
+    /// Receive a value on the underlying port. Has the same semantics as
+    /// `Port.recv`
+    pub fn recv(&mut self) -> T { self.port.recv() }
+    /// Block to receive a value on the underlying port, returning `Some` on
+    /// success or `None` if the channel disconnects. This function has the same
+    /// semantics as `Port.recv_opt`
+    pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() }
+    /// Immediately attempt to receive a value on a port, this function will
+    /// never block. Has the same semantics as `Port.try_recv`.
+    pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() }
+}
+
+#[unsafe_destructor]
+impl Drop for Select {
+    fn drop(&mut self) {
+        assert!(self.head.is_null());
+        assert!(self.tail.is_null());
+    }
+}
+
+#[unsafe_destructor]
+impl<'self, T: Send> Drop for Handle<'self, T> {
+    fn drop(&mut self) {
+        unsafe { self.selector.remove(self.port.queue.packet()) }
+    }
+}
+
+impl Iterator<*mut Packet> for PacketIterator {
+    fn next(&mut self) -> Option<*mut Packet> {
+        if self.cur.is_null() {
+            None
+        } else {
+            let ret = Some(self.cur);
+            unsafe { self.cur = (*self.cur).select_next; }
+            ret
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::super::*;
+    use prelude::*;
+
+    test!(fn smoke() {
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        c1.send(1);
+        select! (
+            foo = p1.recv() => { assert_eq!(foo, 1); },
+            _bar = p2.recv() => { fail!() }
+        )
+        c2.send(2);
+        select! (
+            _foo = p1.recv() => { fail!() },
+            bar = p2.recv() => { assert_eq!(bar, 2) }
+        )
+        drop(c1);
+        select! (
+            foo = p1.recv_opt() => { assert_eq!(foo, None); },
+            _bar = p2.recv() => { fail!() }
+        )
+        drop(c2);
+        select! (
+            bar = p2.recv_opt() => { assert_eq!(bar, None); },
+        )
+    })
+
+    test!(fn smoke2() {
+        let (mut p1, _c1) = Chan::<int>::new();
+        let (mut p2, _c2) = Chan::<int>::new();
+        let (mut p3, _c3) = Chan::<int>::new();
+        let (mut p4, _c4) = Chan::<int>::new();
+        let (mut p5, c5) = Chan::<int>::new();
+        c5.send(4);
+        select! (
+            _foo = p1.recv() => { fail!("1") },
+            _foo = p2.recv() => { fail!("2") },
+            _foo = p3.recv() => { fail!("3") },
+            _foo = p4.recv() => { fail!("4") },
+            foo = p5.recv() => { assert_eq!(foo, 4); }
+        )
+    })
+
+    test!(fn closed() {
+        let (mut p1, _c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        drop(c2);
+
+        select! (
+            _a1 = p1.recv_opt() => { fail!() },
+            a2 = p2.recv_opt() => { assert_eq!(a2, None); }
+        )
+    })
+
+    #[test]
+    fn unblocks() {
+        use std::io::timer;
+
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, _c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<int>::new();
+
+        do spawn {
+            timer::sleep(3);
+            c1.send(1);
+            p3.recv();
+            timer::sleep(3);
+        }
+
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            _b = p2.recv() => { fail!() }
+        )
+        c3.send(1);
+        select! (
+            a = p1.recv_opt() => { assert_eq!(a, None); },
+            _b = p2.recv() => { fail!() }
+        )
+    }
+
+    #[test]
+    fn both_ready() {
+        use std::io::timer;
+
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        do spawn {
+            timer::sleep(3);
+            c1.send(1);
+            c2.send(2);
+            p3.recv();
+        }
+
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            a = p2.recv() => { assert_eq!(a, 2); }
+        )
+        select! (
+            a = p1.recv() => { assert_eq!(a, 1); },
+            a = p2.recv() => { assert_eq!(a, 2); }
+        )
+        c3.send(());
+    }
+
+    #[test]
+    fn stress() {
+        static AMT: int = 10000;
+        let (mut p1, c1) = Chan::<int>::new();
+        let (mut p2, c2) = Chan::<int>::new();
+        let (p3, c3) = Chan::<()>::new();
+
+        do spawn {
+            for i in range(0, AMT) {
+                if i % 2 == 0 {
+                    c1.send(i);
+                } else {
+                    c2.send(i);
+                }
+                p3.recv();
+            }
+        }
+
+        for i in range(0, AMT) {
+            select! (
+                i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); },
+                i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); }
+            )
+            c3.send(());
+        }
+    }
+
+    #[test]
+    fn stress_native() {
+        use std::rt::thread::Thread;
+        use std::unstable::run_in_bare_thread;
+        static AMT: int = 10000;
+
+        do run_in_bare_thread {
+            let (mut p1, c1) = Chan::<int>::new();
+            let (mut p2, c2) = Chan::<int>::new();
+            let (p3, c3) = Chan::<()>::new();
+
+            let t = do Thread::start {
+                for i in range(0, AMT) {
+                    if i % 2 == 0 {
+                        c1.send(i);
+                    } else {
+                        c2.send(i);
+                    }
+                    p3.recv();
+                }
+            };
+
+            for i in range(0, AMT) {
+                select! (
+                    i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); },
+                    i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); }
+                )
+                c3.send(());
+            }
+            t.join();
+        }
+    }
+
+    #[test]
+    fn native_both_ready() {
+        use std::rt::thread::Thread;
+        use std::unstable::run_in_bare_thread;
+
+        do run_in_bare_thread {
+            let (mut p1, c1) = Chan::<int>::new();
+            let (mut p2, c2) = Chan::<int>::new();
+            let (p3, c3) = Chan::<()>::new();
+
+            let t = do Thread::start {
+                c1.send(1);
+                c2.send(2);
+                p3.recv();
+            };
+
+            select! (
+                a = p1.recv() => { assert_eq!(a, 1); },
+                b = p2.recv() => { assert_eq!(b, 2); }
+            )
+            select! (
+                a = p1.recv() => { assert_eq!(a, 1); },
+                b = p2.recv() => { assert_eq!(b, 2); }
+            )
+            c3.send(());
+            t.join();
+        }
+    }
+}