diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-05 17:56:17 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 17:47:11 -0800 |
| commit | bfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch) | |
| tree | b10aeff181eff3a8654df495d2ad8826490f6533 /src/libstd/comm/select.rs | |
| parent | 000cda611f8224ac780fa37432f869f425cd2bb7 (diff) | |
| download | rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip | |
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
* Based on a single-producer single-consumer lock-free queue that doesn't
always have to allocate on every send.
* Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
* try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
* try_recv == never block
* recv_opt == block, don't fail
* iter() == Iterator<T> for Port<T>
* removed peek
* Type::new
* Removed rt::comm
Diffstat (limited to 'src/libstd/comm/select.rs')
| -rw-r--r-- | src/libstd/comm/select.rs | 498 |
1 files changed, 498 insertions, 0 deletions
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs new file mode 100644 index 00000000000..81a77000bad --- /dev/null +++ b/src/libstd/comm/select.rs @@ -0,0 +1,498 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Selection over an array of ports +//! +//! This module contains the implementation machinery necessary for selecting +//! over a number of ports. One large goal of this module is to provide an +//! efficient interface to selecting over any port of any type. +//! +//! This is achieved through an architecture of a "port set" in which ports are +//! added to a set and then the entire set is waited on at once. The set can be +//! waited on multiple times to prevent re-adding each port to the set. +//! +//! Usage of this module is currently encouraged to go through the use of the +//! `select!` macro. This macro allows naturally binding of variables to the +//! received values of ports in a much more natural syntax then usage of the +//! `Select` structure directly. +//! +//! # Example +//! +//! ```rust +//! let (mut p1, c1) = Chan::new(); +//! let (mut p2, c2) = Chan::new(); +//! +//! c1.send(1); +//! c2.send(2); +//! +//! select! ( +//! val = p1.recv() => { +//! assert_eq!(val, 1); +//! } +//! val = p2.recv() => { +//! assert_eq!(val, 2); +//! } +//! ) + +use cast; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use ptr::RawPtr; +use super::imp::BlockingContext; +use super::{Packet, Port, imp}; +use uint; +use unstable::atomics::{Relaxed, SeqCst}; + +macro_rules! select { + ( + $name1:pat = $port1:ident.$meth1:ident() => $code1:expr, + $($name:pat = $port:ident.$meth:ident() => $code:expr),* + ) => ({ + use std::comm::Select; + let sel = Select::new(); + let mut $port1 = sel.add(&mut $port1); + $( let mut $port = sel.add(&mut $port); )* + let ret = sel.wait(); + if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 } + $( else if ret == $port.id { let $name = $port.$meth(); $code } )* + else { unreachable!() } + }) +} + +/// The "port set" of the select interface. This structure is used to manage a +/// set of ports which are being selected over. +#[no_freeze] +#[no_send] +pub struct Select { + priv head: *mut Packet, + priv tail: *mut Packet, + priv next_id: uint, +} + +/// A handle to a port which is currently a member of a `Select` set of ports. +/// This handle is used to keep the port in the set as well as interact with the +/// underlying port. +pub struct Handle<'self, T> { + id: uint, + priv selector: &'self Select, + priv port: &'self mut Port<T>, +} + +struct PacketIterator { priv cur: *mut Packet } + +impl Select { + /// Creates a new selection structure. This set is initially empty and + /// `wait` will fail!() if called. + /// + /// Usage of this struct directly can sometimes be burdensome, and usage is + /// rather much easier through the `select!` macro. + pub fn new() -> Select { + Select { + head: 0 as *mut Packet, + tail: 0 as *mut Packet, + next_id: 1, + } + } + + /// Adds a new port to this set, returning a handle which is then used to + /// receive on the port. + /// + /// Note that this port parameter takes `&mut Port` instead of `&Port`. None + /// of the methods of receiving on a port require `&mut self`, but `&mut` is + /// used here in order to have the compiler guarantee that the same port is + /// not added to this set more than once. + /// + /// When the returned handle falls out of scope, the port will be removed + /// from this set. While the handle is in this set, usage of the port can be + /// done through the `Handle`'s receiving methods. + pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> { + let this = unsafe { cast::transmute_mut(self) }; + let id = this.next_id; + this.next_id += 1; + unsafe { + let packet = port.queue.packet(); + assert!(!(*packet).selecting.load(Relaxed)); + assert_eq!((*packet).selection_id, 0); + (*packet).selection_id = id; + if this.head.is_null() { + this.head = packet; + this.tail = packet; + } else { + (*packet).select_prev = this.tail; + assert!((*packet).select_next.is_null()); + (*this.tail).select_next = packet; + this.tail = packet; + } + } + Handle { id: id, selector: this, port: port } + } + + /// Waits for an event on this port set. The returned valus is *not* and + /// index, but rather an id. This id can be queried against any active + /// `Handle` structures (each one has a public `id` field). The handle with + /// the matching `id` will have some sort of event available on it. The + /// event could either be that data is available or the corresponding + /// channel has been closed. + pub fn wait(&self) -> uint { + // Note that this is currently an inefficient implementation. We in + // theory have knowledge about all ports in the set ahead of time, so + // this method shouldn't really have to iterate over all of them yet + // again. The idea with this "port set" interface is to get the + // interface right this time around, and later this implementation can + // be optimized. + // + // This implementation can be summarized by: + // + // fn select(ports) { + // if any port ready { return ready index } + // deschedule { + // block on all ports + // } + // unblock on all ports + // return ready index + // } + // + // Most notably, the iterations over all of the ports shouldn't be + // necessary. + unsafe { + let mut amt = 0; + for p in self.iter() { + assert!(!(*p).selecting.load(Relaxed)); + amt += 1; + if (*p).can_recv() { + return (*p).selection_id; + } + } + assert!(amt > 0); + + let mut ready_index = amt; + let mut ready_id = uint::max_value; + let mut iter = self.iter().enumerate(); + + // Acquire a number of blocking contexts, and block on each one + // sequentially until one fails. If one fails, then abort + // immediately so we can go unblock on all the other ports. + BlockingContext::many(amt, |ctx| { + let (i, packet) = iter.next().unwrap(); + (*packet).selecting.store(true, SeqCst); + if !ctx.block(&mut (*packet).data, + &mut (*packet).to_wake, + || (*packet).decrement()) { + (*packet).abort_selection(false); + (*packet).selecting.store(false, SeqCst); + ready_index = i; + ready_id = (*packet).selection_id; + false + } else { + true + } + }); + + // Abort the selection process on each port. If the abort process + // returns `true`, then that means that the port is ready to receive + // some data. Note that this also means that the port may have yet + // to have fully read the `to_wake` field and woken us up (although + // the wakeup is guaranteed to fail). + // + // This situation happens in the window of where a sender invokes + // increment(), sees -1, and then decides to wake up the task. After + // all this is done, the sending thread will set `selecting` to + // `false`. Until this is done, we cannot return. If we were to + // return, then a sender could wake up a port which has gone back to + // sleep after this call to `select`. + // + // Note that it is a "fairly small window" in which an increment() + // views that it should wake a thread up until the `selecting` bit + // is set to false. For now, the implementation currently just spins + // in a yield loop. This is very distasteful, but this + // implementation is already nowhere near what it should ideally be. + // A rewrite should focus on avoiding a yield loop, and for now this + // implementation is tying us over to a more efficient "don't + // iterate over everything every time" implementation. + for packet in self.iter().take(ready_index) { + if (*packet).abort_selection(true) { + ready_id = (*packet).selection_id; + while (*packet).selecting.load(Relaxed) { + imp::yield_now(); + } + } + } + + // Sanity check for now to make sure that everyone is turned off. + for packet in self.iter() { + assert!(!(*packet).selecting.load(Relaxed)); + } + + return ready_id; + } + } + + unsafe fn remove(&self, packet: *mut Packet) { + let this = cast::transmute_mut(self); + assert!(!(*packet).selecting.load(Relaxed)); + if (*packet).select_prev.is_null() { + assert_eq!(packet, this.head); + this.head = (*packet).select_next; + } else { + (*(*packet).select_prev).select_next = (*packet).select_next; + } + if (*packet).select_next.is_null() { + assert_eq!(packet, this.tail); + this.tail = (*packet).select_prev; + } else { + (*(*packet).select_next).select_prev = (*packet).select_prev; + } + (*packet).select_next = 0 as *mut Packet; + (*packet).select_prev = 0 as *mut Packet; + (*packet).selection_id = 0; + } + + fn iter(&self) -> PacketIterator { PacketIterator { cur: self.head } } +} + +impl<'self, T: Send> Handle<'self, T> { + /// Receive a value on the underlying port. Has the same semantics as + /// `Port.recv` + pub fn recv(&mut self) -> T { self.port.recv() } + /// Block to receive a value on the underlying port, returning `Some` on + /// success or `None` if the channel disconnects. This function has the same + /// semantics as `Port.recv_opt` + pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() } + /// Immediately attempt to receive a value on a port, this function will + /// never block. Has the same semantics as `Port.try_recv`. + pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() } +} + +#[unsafe_destructor] +impl Drop for Select { + fn drop(&mut self) { + assert!(self.head.is_null()); + assert!(self.tail.is_null()); + } +} + +#[unsafe_destructor] +impl<'self, T: Send> Drop for Handle<'self, T> { + fn drop(&mut self) { + unsafe { self.selector.remove(self.port.queue.packet()) } + } +} + +impl Iterator<*mut Packet> for PacketIterator { + fn next(&mut self) -> Option<*mut Packet> { + if self.cur.is_null() { + None + } else { + let ret = Some(self.cur); + unsafe { self.cur = (*self.cur).select_next; } + ret + } + } +} + +#[cfg(test)] +mod test { + use super::super::*; + use prelude::*; + + test!(fn smoke() { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + c1.send(1); + select! ( + foo = p1.recv() => { assert_eq!(foo, 1); }, + _bar = p2.recv() => { fail!() } + ) + c2.send(2); + select! ( + _foo = p1.recv() => { fail!() }, + bar = p2.recv() => { assert_eq!(bar, 2) } + ) + drop(c1); + select! ( + foo = p1.recv_opt() => { assert_eq!(foo, None); }, + _bar = p2.recv() => { fail!() } + ) + drop(c2); + select! ( + bar = p2.recv_opt() => { assert_eq!(bar, None); }, + ) + }) + + test!(fn smoke2() { + let (mut p1, _c1) = Chan::<int>::new(); + let (mut p2, _c2) = Chan::<int>::new(); + let (mut p3, _c3) = Chan::<int>::new(); + let (mut p4, _c4) = Chan::<int>::new(); + let (mut p5, c5) = Chan::<int>::new(); + c5.send(4); + select! ( + _foo = p1.recv() => { fail!("1") }, + _foo = p2.recv() => { fail!("2") }, + _foo = p3.recv() => { fail!("3") }, + _foo = p4.recv() => { fail!("4") }, + foo = p5.recv() => { assert_eq!(foo, 4); } + ) + }) + + test!(fn closed() { + let (mut p1, _c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + drop(c2); + + select! ( + _a1 = p1.recv_opt() => { fail!() }, + a2 = p2.recv_opt() => { assert_eq!(a2, None); } + ) + }) + + #[test] + fn unblocks() { + use std::io::timer; + + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, _c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<int>::new(); + + do spawn { + timer::sleep(3); + c1.send(1); + p3.recv(); + timer::sleep(3); + } + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + _b = p2.recv() => { fail!() } + ) + c3.send(1); + select! ( + a = p1.recv_opt() => { assert_eq!(a, None); }, + _b = p2.recv() => { fail!() } + ) + } + + #[test] + fn both_ready() { + use std::io::timer; + + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + do spawn { + timer::sleep(3); + c1.send(1); + c2.send(2); + p3.recv(); + } + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + a = p2.recv() => { assert_eq!(a, 2); } + ) + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + a = p2.recv() => { assert_eq!(a, 2); } + ) + c3.send(()); + } + + #[test] + fn stress() { + static AMT: int = 10000; + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + do spawn { + for i in range(0, AMT) { + if i % 2 == 0 { + c1.send(i); + } else { + c2.send(i); + } + p3.recv(); + } + } + + for i in range(0, AMT) { + select! ( + i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); } + ) + c3.send(()); + } + } + + #[test] + fn stress_native() { + use std::rt::thread::Thread; + use std::unstable::run_in_bare_thread; + static AMT: int = 10000; + + do run_in_bare_thread { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + let t = do Thread::start { + for i in range(0, AMT) { + if i % 2 == 0 { + c1.send(i); + } else { + c2.send(i); + } + p3.recv(); + } + }; + + for i in range(0, AMT) { + select! ( + i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); } + ) + c3.send(()); + } + t.join(); + } + } + + #[test] + fn native_both_ready() { + use std::rt::thread::Thread; + use std::unstable::run_in_bare_thread; + + do run_in_bare_thread { + let (mut p1, c1) = Chan::<int>::new(); + let (mut p2, c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + let t = do Thread::start { + c1.send(1); + c2.send(2); + p3.recv(); + }; + + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + b = p2.recv() => { assert_eq!(b, 2); } + ) + select! ( + a = p1.recv() => { assert_eq!(a, 1); }, + b = p2.recv() => { assert_eq!(b, 2); } + ) + c3.send(()); + t.join(); + } + } +} |
