diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-21 22:15:04 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-01-15 11:21:56 -0800 |
| commit | adb895a34f6d0b925b8ef877289ca6e3c4d854d4 (patch) | |
| tree | fc5a2b0a6930d08c9b96e4ef24bdf5ff31adc3ec /src/libstd | |
| parent | 900893112570eea5a01c0573ae1fa1e3a72397e9 (diff) | |
| download | rust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.tar.gz rust-adb895a34f6d0b925b8ef877289ca6e3c4d854d4.zip | |
Allow more "error" values in try_recv()
This should allow callers to know whether the channel was empty or disconnected without having to block. Closes #11087
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm/mod.rs | 104 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 9 | ||||
| -rw-r--r-- | src/libstd/io/signal.rs | 3 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 5 |
4 files changed, 98 insertions, 23 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a..bf9e28f3e97 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -251,6 +251,7 @@ macro_rules! test ( #[allow(unused_imports)]; use native; + use comm::*; use prelude::*; use super::*; use super::super::*; @@ -323,6 +324,20 @@ pub struct SharedChan<T> { priv queue: mpsc::Producer<T, Packet>, } +/// This enumeration is the list of the possible reasons that try_recv could not +/// return data when called. +#[deriving(Eq, Clone)] +pub enum TryRecvResult<T> { + /// This channel is currently empty, but the sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// This channel's sending half has become disconnected, and there will + /// never be any more data received on this channel + Disconnected, + /// The channel had some data and we successfully popped it + Data(T), +} + /////////////////////////////////////////////////////////////////////////////// // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// @@ -739,11 +754,11 @@ impl<T: Send> Port<T> { /// block on a port. /// /// This function cannot fail. - pub fn try_recv(&self) -> Option<T> { + pub fn try_recv(&self) -> TryRecvResult<T> { self.try_recv_inc(true) } - fn try_recv_inc(&self, increment: bool) -> Option<T> { + fn try_recv_inc(&self, increment: bool) -> TryRecvResult<T> { // This is a "best effort" situation, so if a queue is inconsistent just // don't worry about it. let this = unsafe { cast::transmute_mut(self) }; @@ -807,7 +822,35 @@ impl<T: Send> Port<T> { if increment && ret.is_some() { unsafe { (*this.queue.packet()).steals += 1; } } - return ret; + match ret { + Some(t) => Data(t), + None => { + // It's possible that between the time that we saw the queue was + // empty and here the other side disconnected. It's also + // possible for us to see the disconnection here while there is + // data in the queue. It's pretty backwards-thinking to return + // Disconnected when there's actually data on the queue, so if + // we see a disconnected state be sure to check again to be 100% + // sure that there's no data in the queue. + let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) }; + if cnt != DISCONNECTED { return Empty } + + let ret = match this.queue { + SPSC(ref mut queue) => queue.pop(), + MPSC(ref mut queue) => match queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + mpsc::Inconsistent => { + fail!("inconsistent with no senders?!"); + } + } + }; + match ret { + Some(data) => Data(data), + None => Disconnected, + } + } + } } /// Attempt to wait for a value on this port, but does not fail if the @@ -824,7 +867,11 @@ impl<T: Send> Port<T> { /// the value found on the port is returned. pub fn recv_opt(&self) -> Option<T> { // optimistic preflight check (scheduling is expensive) - match self.try_recv() { None => {}, data => return data } + match self.try_recv() { + Empty => {}, + Disconnected => return None, + Data(t) => return Some(t), + } let packet; let this; @@ -843,12 +890,11 @@ impl<T: Send> Port<T> { }); } - let data = self.try_recv_inc(false); - if data.is_none() && - unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED { - fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) }); + match self.try_recv_inc(false) { + Data(t) => Some(t), + Empty => fail!("bug: woke up too soon"), + Disconnected => None, } - return data; } /// Returns an iterator which will block waiting for messages, but never @@ -1005,7 +1051,10 @@ mod test { for _ in range(0, AMT * NTHREADS) { assert_eq!(p.recv(), 1); } - assert_eq!(p.try_recv(), None); + match p.try_recv() { + Data(..) => fail!(), + _ => {} + } c1.send(()); } @@ -1129,7 +1178,7 @@ mod test { test!(fn oneshot_single_thread_try_recv_open() { let (port, chan) = Chan::<int>::new(); chan.send(10); - assert!(port.try_recv() == Some(10)); + assert!(port.recv_opt() == Some(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { @@ -1140,21 +1189,21 @@ mod test { test!(fn oneshot_single_thread_peek_data() { let (port, chan) = Chan::<int>::new(); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Empty) chan.send(10); - assert!(port.try_recv().is_some()); + assert_eq!(port.try_recv(), Data(10)); }) test!(fn oneshot_single_thread_peek_close() { let (port, chan) = Chan::<int>::new(); { let _c = chan; } - assert!(port.try_recv().is_none()); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Disconnected); + assert_eq!(port.try_recv(), Disconnected); }) test!(fn oneshot_single_thread_peek_open() { let (port, _) = Chan::<int>::new(); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Empty); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1321,4 +1370,27 @@ mod test { drop(chan); assert_eq!(count_port.recv(), 4); }) + + test!(fn try_recv_states() { + let (p, c) = Chan::<int>::new(); + let (p1, c1) = Chan::<()>::new(); + let (p2, c2) = Chan::<()>::new(); + do spawn { + p1.recv(); + c.send(1); + c2.send(()); + p1.recv(); + drop(c); + c2.send(()); + } + + assert_eq!(p.try_recv(), Empty); + c1.send(()); + p2.recv(); + assert_eq!(p.try_recv(), Data(1)); + assert_eq!(p.try_recv(), Empty); + c1.send(()); + p2.recv(); + assert_eq!(p.try_recv(), Disconnected); + }) } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 302c9d9ea46..fa5ec1d3e30 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -45,6 +45,7 @@ #[allow(dead_code)]; use cast; +use comm; use iter::Iterator; use kinds::Send; use ops::Drop; @@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> { pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() } /// Immediately attempt to receive a value on a port, this function will /// never block. Has the same semantics as `Port.try_recv`. - pub fn try_recv(&mut self) -> Option<T> { self.port.try_recv() } + pub fn try_recv(&mut self) -> comm::TryRecvResult<T> { + self.port.try_recv() + } } #[unsafe_destructor] @@ -409,8 +412,8 @@ mod test { a = p1.recv() => { assert_eq!(a, 1); }, a = p2.recv() => { assert_eq!(a, 2); } ) - assert_eq!(p1.try_recv(), None); - assert_eq!(p2.try_recv(), None); + assert_eq!(p1.try_recv(), Empty); + assert_eq!(p2.try_recv(), Empty); c3.send(()); }) diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 34b4ed5e1ef..0f05254b034 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -144,6 +144,7 @@ impl Listener { #[cfg(test)] mod test { use libc; + use comm::Empty; use io::timer; use super::{Listener, Interrupt}; @@ -194,7 +195,7 @@ mod test { s2.unregister(Interrupt); sigint(); timer::sleep(10); - assert!(s2.port.try_recv().is_none()); + assert_eq!(s2.port.try_recv(), Empty); } #[cfg(windows)] diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 7c9aa28bfe9..d156a7460e1 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -123,7 +123,7 @@ mod test { let port1 = timer.oneshot(10000); let port = timer.oneshot(1); port.recv(); - assert_eq!(port1.try_recv(), None); + assert!(port1.recv_opt().is_none()); } #[test] @@ -131,8 +131,7 @@ mod test { let mut timer = Timer::new().unwrap(); let port = timer.oneshot(100000000000); timer.sleep(1); // this should invalidate the port - - assert_eq!(port.try_recv(), None); + assert!(port.recv_opt().is_none()); } #[test] |
