diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-01-08 18:31:48 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-02-11 16:32:00 -0800 |
| commit | 0a6b9219d180503254b55cfd14cdaf072fb35ac4 (patch) | |
| tree | 11ee47384b4ecaba8004ec5804c97db8782110e2 /src/libstd/comm/select.rs | |
| parent | 47ef20014c32443b12a122c0371a87f513830807 (diff) | |
| download | rust-0a6b9219d180503254b55cfd14cdaf072fb35ac4.tar.gz rust-0a6b9219d180503254b55cfd14cdaf072fb35ac4.zip | |
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind channels, but rather their API usage. In the past, we had the distinction between oneshot, stream, and shared channels, but the most recent rewrite dropped oneshots in favor of streams and shared channels. This distinction of stream vs shared has shown that it's not quite what we'd like either, and this moves the `std::comm` module in the direction of "one channel to rule them all". There now remains only one Chan and one Port. This new channel is actually a hybrid oneshot/stream/shared channel under the hood in order to optimize for the use cases in question. Additionally, this also reduces the cognitive burden of having to choose between a Chan or a SharedChan in an API. My simple benchmarks show no reduction in efficiency over the existing channels today, and a 3x improvement in the oneshot case. I sadly don't have a pre-last-rewrite compiler to test out the old old oneshots, but I would imagine that the performance is comparable, but slightly slower (due to atomic reference counting). This commit also brings the bonus bugfix to channels that the pending queue of messages are all dropped when a Port disappears rather then when both the Port and the Chan disappear.
Diffstat (limited to 'src/libstd/comm/select.rs')
| -rw-r--r-- | src/libstd/comm/select.rs | 312 |
1 files changed, 192 insertions, 120 deletions
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index cf8df863817..b6b35ccc357 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -45,19 +45,17 @@ #[allow(dead_code)]; use cast; -use comm; +use cell::Cell; use iter::Iterator; use kinds::marker; use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use result::{Ok, Err}; +use result::{Ok, Err, Result}; use rt::local::Local; -use rt::task::Task; -use super::{Packet, Port}; -use sync::atomics::{Relaxed, SeqCst}; -use task; +use rt::task::{Task, BlockedTask}; +use super::Port; use uint; macro_rules! select { @@ -67,8 +65,12 @@ macro_rules! select { ) => ({ use std::comm::Select; let sel = Select::new(); - let mut $port1 = sel.add(&mut $port1); - $( let mut $port = sel.add(&mut $port); )* + let mut $port1 = sel.handle(&$port1); + $( let mut $port = sel.handle(&$port); )* + unsafe { + $port1.add(); + $( $port.add(); )* + } let ret = sel.wait(); if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 } $( else if ret == $port.id { let $name = $port.$meth(); $code } )* @@ -79,9 +81,9 @@ macro_rules! select { /// The "port set" of the select interface. This structure is used to manage a /// set of ports which are being selected over. pub struct Select { - priv head: *mut Packet, - priv tail: *mut Packet, - priv next_id: uint, + priv head: *mut Handle<'static, ()>, + priv tail: *mut Handle<'static, ()>, + priv next_id: Cell<uint>, priv marker1: marker::NoSend, priv marker2: marker::NoFreeze, } @@ -90,13 +92,28 @@ pub struct Select { /// This handle is used to keep the port in the set as well as interact with the /// underlying port. pub struct Handle<'port, T> { - /// A unique ID for this Handle. + /// The ID of this handle, used to compare against the return value of + /// `Select::wait()` id: uint, priv selector: &'port Select, - priv port: &'port mut Port<T>, + priv next: *mut Handle<'static, ()>, + priv prev: *mut Handle<'static, ()>, + priv added: bool, + priv packet: &'port Packet, + + // due to our fun transmutes, we be sure to place this at the end. (nothing + // previous relies on T) + priv port: &'port Port<T>, } -struct Packets { cur: *mut Packet } +struct Packets { cur: *mut Handle<'static, ()> } + +#[doc(hidden)] +pub trait Packet { + fn can_recv(&self) -> bool; + fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>; + fn abort_selection(&self) -> bool; +} impl Select { /// Creates a new selection structure. This set is initially empty and @@ -106,45 +123,29 @@ impl Select { /// rather much easier through the `select!` macro. pub fn new() -> Select { Select { - head: 0 as *mut Packet, - tail: 0 as *mut Packet, - next_id: 1, marker1: marker::NoSend, marker2: marker::NoFreeze, + head: 0 as *mut Handle<'static, ()>, + tail: 0 as *mut Handle<'static, ()>, + next_id: Cell::new(1), } } - /// Adds a new port to this set, returning a handle which is then used to - /// receive on the port. - /// - /// Note that this port parameter takes `&mut Port` instead of `&Port`. None - /// of the methods of receiving on a port require `&mut self`, but `&mut` is - /// used here in order to have the compiler guarantee that the same port is - /// not added to this set more than once. - /// - /// When the returned handle falls out of scope, the port will be removed - /// from this set. While the handle is in this set, usage of the port can be - /// done through the `Handle`'s receiving methods. - pub fn add<'a, T: Send>(&'a self, port: &'a mut Port<T>) -> Handle<'a, T> { - let this = unsafe { cast::transmute_mut(self) }; - let id = this.next_id; - this.next_id += 1; - unsafe { - let packet = port.inner.packet(); - assert!(!(*packet).selecting.load(Relaxed)); - assert_eq!((*packet).selection_id, 0); - (*packet).selection_id = id; - if this.head.is_null() { - this.head = packet as *mut Packet; - this.tail = packet as *mut Packet; - } else { - (*packet).select_prev = this.tail; - assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet as *mut Packet; - this.tail = packet as *mut Packet; - } + /// Creates a new handle into this port set for a new port. Note that this + /// does *not* add the port to the port set, for that you must call the + /// `add` method on the handle itself. + pub fn handle<'a, T: Send>(&'a self, port: &'a Port<T>) -> Handle<'a, T> { + let id = self.next_id.get(); + self.next_id.set(id + 1); + Handle { + id: id, + selector: self, + next: 0 as *mut Handle<'static, ()>, + prev: 0 as *mut Handle<'static, ()>, + added: false, + port: port, + packet: port, } - Handle { id: id, selector: this, port: port } } /// Waits for an event on this port set. The returned valus is *not* and @@ -177,10 +178,9 @@ impl Select { unsafe { let mut amt = 0; for p in self.iter() { - assert!(!(*p).selecting.load(Relaxed)); amt += 1; - if (*p).can_recv() { - return (*p).selection_id; + if (*p).packet.can_recv() { + return (*p).id; } } assert!(amt > 0); @@ -195,22 +195,14 @@ impl Select { let task: ~Task = Local::take(); task.deschedule(amt, |task| { // Prepare for the block - let (i, packet) = iter.next().unwrap(); - assert!((*packet).to_wake.is_none()); - (*packet).to_wake = Some(task); - (*packet).selecting.store(true, SeqCst); - - if (*packet).decrement() { - Ok(()) - } else { - // Empty to_wake first to avoid tripping an assertion in - // abort_selection in the disconnected case. - let task = (*packet).to_wake.take_unwrap(); - (*packet).abort_selection(false); - (*packet).selecting.store(false, SeqCst); - ready_index = i; - ready_id = (*packet).selection_id; - Err(task) + let (i, handle) = iter.next().unwrap(); + match (*handle).packet.start_selection(task) { + Ok(()) => Ok(()), + Err(task) => { + ready_index = i; + ready_id = (*handle).id; + Err(task) + } } }); @@ -235,45 +227,17 @@ impl Select { // A rewrite should focus on avoiding a yield loop, and for now this // implementation is tying us over to a more efficient "don't // iterate over everything every time" implementation. - for packet in self.iter().take(ready_index) { - if (*packet).abort_selection(true) { - ready_id = (*packet).selection_id; - while (*packet).selecting.load(Relaxed) { - task::deschedule(); - } + for handle in self.iter().take(ready_index) { + if (*handle).packet.abort_selection() { + ready_id = (*handle).id; } } - // Sanity check for now to make sure that everyone is turned off. - for packet in self.iter() { - assert!(!(*packet).selecting.load(Relaxed)); - } - assert!(ready_id != uint::MAX); return ready_id; } } - unsafe fn remove(&self, packet: *mut Packet) { - let this = cast::transmute_mut(self); - assert!(!(*packet).selecting.load(Relaxed)); - if (*packet).select_prev.is_null() { - assert_eq!(packet, this.head); - this.head = (*packet).select_next; - } else { - (*(*packet).select_prev).select_next = (*packet).select_next; - } - if (*packet).select_next.is_null() { - assert_eq!(packet, this.tail); - this.tail = (*packet).select_prev; - } else { - (*(*packet).select_next).select_prev = (*packet).select_prev; - } - (*packet).select_next = 0 as *mut Packet; - (*packet).select_prev = 0 as *mut Packet; - (*packet).selection_id = 0; - } - fn iter(&self) -> Packets { Packets { cur: self.head } } } @@ -285,10 +249,56 @@ impl<'port, T: Send> Handle<'port, T> { /// success or `None` if the channel disconnects. This function has the same /// semantics as `Port.recv_opt` pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() } - /// Immediately attempt to receive a value on a port, this function will - /// never block. Has the same semantics as `Port.try_recv`. - pub fn try_recv(&mut self) -> comm::TryRecvResult<T> { - self.port.try_recv() + + /// Adds this handle to the port set that the handle was created from. This + /// method can be called multiple times, but it has no effect if `add` was + /// called previously. + /// + /// This method is unsafe because it requires that the `Handle` is not moved + /// while it is added to the `Select` set. + pub unsafe fn add(&mut self) { + if self.added { return } + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if selector.head.is_null() { + selector.head = me; + selector.tail = me; + } else { + (*me).prev = selector.tail; + assert!((*me).next.is_null()); + (*selector.tail).next = me; + selector.tail = me; + } + self.added = true; + } + + /// Removes this handle from the `Select` set. This method is unsafe because + /// it has no guarantee that the `Handle` was not moved since `add` was + /// called. + pub unsafe fn remove(&mut self) { + if !self.added { return } + + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if self.prev.is_null() { + assert_eq!(selector.head, me); + selector.head = self.next; + } else { + (*self.prev).next = self.next; + } + if self.next.is_null() { + assert_eq!(selector.tail, me); + selector.tail = self.prev; + } else { + (*self.next).prev = self.prev; + } + + self.next = 0 as *mut Handle<'static, ()>; + self.prev = 0 as *mut Handle<'static, ()>; + + self.added = false; } } @@ -303,17 +313,17 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.inner.packet()) } + unsafe { self.remove() } } } -impl Iterator<*mut Packet> for Packets { - fn next(&mut self) -> Option<*mut Packet> { +impl Iterator<*mut Handle<'static, ()>> for Packets { + fn next(&mut self) -> Option<*mut Handle<'static, ()>> { if self.cur.is_null() { None } else { let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).select_next; } + unsafe { self.cur = (*self.cur).next; } ret } } @@ -326,8 +336,8 @@ mod test { use prelude::*; test!(fn smoke() { - let (mut p1, c1) = Chan::<int>::new(); - let (mut p2, c2) = Chan::<int>::new(); + let (p1, c1) = Chan::<int>::new(); + let (p2, c2) = Chan::<int>::new(); c1.send(1); select! ( foo = p1.recv() => { assert_eq!(foo, 1); }, @@ -350,11 +360,11 @@ mod test { }) test!(fn smoke2() { - let (mut p1, _c1) = Chan::<int>::new(); - let (mut p2, _c2) = Chan::<int>::new(); - let (mut p3, _c3) = Chan::<int>::new(); - let (mut p4, _c4) = Chan::<int>::new(); - let (mut p5, c5) = Chan::<int>::new(); + let (p1, _c1) = Chan::<int>::new(); + let (p2, _c2) = Chan::<int>::new(); + let (p3, _c3) = Chan::<int>::new(); + let (p4, _c4) = Chan::<int>::new(); + let (p5, c5) = Chan::<int>::new(); c5.send(4); select! ( _foo = p1.recv() => { fail!("1") }, @@ -366,8 +376,8 @@ mod test { }) test!(fn closed() { - let (mut p1, _c1) = Chan::<int>::new(); - let (mut p2, c2) = Chan::<int>::new(); + let (p1, _c1) = Chan::<int>::new(); + let (p2, c2) = Chan::<int>::new(); drop(c2); select! ( @@ -377,8 +387,8 @@ mod test { }) test!(fn unblocks() { - let (mut p1, c1) = Chan::<int>::new(); - let (mut p2, _c2) = Chan::<int>::new(); + let (p1, c1) = Chan::<int>::new(); + let (p2, _c2) = Chan::<int>::new(); let (p3, c3) = Chan::<int>::new(); spawn(proc() { @@ -400,8 +410,8 @@ mod test { }) test!(fn both_ready() { - let (mut p1, c1) = Chan::<int>::new(); - let (mut p2, c2) = Chan::<int>::new(); + let (p1, c1) = Chan::<int>::new(); + let (p2, c2) = Chan::<int>::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -426,8 +436,8 @@ mod test { test!(fn stress() { static AMT: int = 10000; - let (mut p1, c1) = Chan::<int>::new(); - let (mut p2, c2) = Chan::<int>::new(); + let (p1, c1) = Chan::<int>::new(); + let (p2, c2) = Chan::<int>::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -449,4 +459,66 @@ mod test { c3.send(()); } }) + + test!(fn cloning() { + let (p1, c1) = Chan::<int>::new(); + let (p2, _c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning2() { + let (p1, c1) = Chan::<int>::new(); + let (p2, _c2) = Chan::<int>::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning3() { + let (p1, c1) = Chan::<()>::new(); + let (p2, c2) = Chan::<()>::new(); + let (p, c) = Chan::new(); + spawn(proc() { + let mut s = Select::new(); + let mut h1 = s.handle(&p1); + let mut h2 = s.handle(&p2); + unsafe { h2.add(); } + unsafe { h1.add(); } + assert_eq!(s.wait(), h2.id); + c.send(()); + }); + + for _ in range(0, 1000) { task::deschedule(); } + drop(c1.clone()); + c2.send(()); + p.recv(); + }) } |
