diff options
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/comm.rs | 23 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/select.rs | 310 |
3 files changed, 23 insertions, 313 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 793e244bec7..f43d1c74eae 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -18,7 +18,8 @@ use kinds::Send; use rt; use rt::sched::Scheduler; use rt::local::Local; -use rt::select::{Select, SelectPort}; +use rt::select::{SelectInner, SelectPortInner}; +use select::{Select, SelectPort}; use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; @@ -215,7 +216,7 @@ impl<T> PortOne<T> { } } -impl<T> Select for PortOne<T> { +impl<T> SelectInner for PortOne<T> { #[inline] #[cfg(not(test))] fn optimistic_check(&mut self) -> bool { unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } @@ -318,7 +319,9 @@ impl<T> Select for PortOne<T> { } } -impl<T> SelectPort<T> for PortOne<T> { +impl<T> Select for PortOne<T> { } + +impl<T> SelectPortInner<T> for PortOne<T> { fn recv_ready(self) -> Option<T> { let mut this = self; let packet = this.packet(); @@ -349,6 +352,8 @@ impl<T> SelectPort<T> for PortOne<T> { } } +impl<T> SelectPort<T> for PortOne<T> { } + impl<T> Peekable<T> for PortOne<T> { fn peek(&self) -> bool { unsafe { @@ -513,7 +518,7 @@ impl<T> Peekable<T> for Port<T> { // of them, but a &Port<T> should also be selectable so you can select2 on it // alongside a PortOne<U> without passing the port by value in recv_ready. -impl<'self, T> Select for &'self Port<T> { +impl<'self, T> SelectInner for &'self Port<T> { #[inline] fn optimistic_check(&mut self) -> bool { do self.next.with_mut_ref |pone| { pone.optimistic_check() } @@ -531,7 +536,9 @@ impl<'self, T> Select for &'self Port<T> { } } -impl<T> Select for Port<T> { +impl<'self, T> Select for &'self Port<T> { } + +impl<T> SelectInner for Port<T> { #[inline] fn optimistic_check(&mut self) -> bool { (&*self).optimistic_check() @@ -548,7 +555,9 @@ impl<T> Select for Port<T> { } } -impl<'self, T> SelectPort<T> for &'self Port<T> { +impl<T> Select for Port<T> { } + +impl<'self, T> SelectPortInner<T> for &'self Port<T> { fn recv_ready(self) -> Option<T> { match self.next.take().recv_ready() { Some(StreamPayload { val, next }) => { @@ -560,6 +569,8 @@ impl<'self, T> SelectPort<T> for &'self Port<T> { } } +impl<'self, T> SelectPort<T> for &'self Port<T> { } + pub struct SharedChan<T> { // Just like Chan, but a shared AtomicOption instead of Cell priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>> diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 1b9f28b95fb..3a991e92b0b 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -142,8 +142,7 @@ pub mod tube; /// Simple reimplementation of core::comm pub mod comm; -/// Routines for select()ing on pipes. -pub mod select; +mod select; // FIXME #5248 shouldn't be pub /// The runtime needs to be able to put a pointer into thread-local storage. diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs index bde703af315..19a4948af3c 100644 --- a/src/libstd/rt/select.rs +++ b/src/libstd/rt/select.rs @@ -8,14 +8,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use option::*; -// use either::{Either, Left, Right}; +//! Module for private, abstraction-leaking select traits. Wrapped in std::select. + use rt::kill::BlockedTask; use rt::sched::Scheduler; -use rt::local::Local; +use option::Option; -/// Trait for message-passing primitives that can be select()ed on. -pub trait Select { +pub trait SelectInner { // Returns true if data was available. fn optimistic_check(&mut self) -> bool; // Returns true if data was available. If so, shall also wake() the task. @@ -24,305 +23,6 @@ pub trait Select { fn unblock_from(&mut self) -> bool; } -/// Trait for message-passing primitives that can use the select2() convenience wrapper. -// (This is separate from the above trait to enable heterogeneous lists of ports -// that implement Select on different types to use select().) -pub trait SelectPort<T> : Select { +pub trait SelectPortInner<T> { fn recv_ready(self) -> Option<T>; } - -/// Receive a message from any one of many ports at once. -pub fn select<A: Select>(ports: &mut [A]) -> uint { - if ports.is_empty() { - fail!("can't select on an empty list"); - } - - for (index, port) in ports.mut_iter().enumerate() { - if port.optimistic_check() { - return index; - } - } - - // If one of the ports already contains data when we go to block on it, we - // don't bother enqueueing on the rest of them, so we shouldn't bother - // unblocking from it either. This is just for efficiency, not correctness. - // (If not, we need to unblock from all of them. Length is a placeholder.) - let mut ready_index = ports.len(); - - let sched = Local::take::<Scheduler>(); - do sched.deschedule_running_task_and_then |sched, task| { - let task_handles = task.make_selectable(ports.len()); - - for (index, (port, task_handle)) in - ports.mut_iter().zip(task_handles.move_iter()).enumerate() { - // If one of the ports has data by now, it will wake the handle. - if port.block_on(sched, task_handle) { - ready_index = index; - break; - } - } - } - - // Task resumes. Now unblock ourselves from all the ports we blocked on. - // If the success index wasn't reset, 'take' will just take all of them. - // Iterate in reverse so the 'earliest' index that's ready gets returned. - for (index, port) in ports.mut_slice(0, ready_index).mut_rev_iter().enumerate() { - if port.unblock_from() { - ready_index = index; - } - } - - assert!(ready_index < ports.len()); - return ready_index; -} - -/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. - -impl <'self> Select for &'self mut Select { - fn optimistic_check(&mut self) -> bool { self.optimistic_check() } - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - self.block_on(sched, task) - } - fn unblock_from(&mut self) -> bool { self.unblock_from() } -} - -pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B) - -> Either<(Option<TA>, B), (A, Option<TB>)> { - let result = { - let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; - select(ports) - }; - match result { - 0 => Left ((a.recv_ready(), b)), - 1 => Right((a, b.recv_ready())), - x => fail!("impossible case in select2: %?", x) - } -} - -*/ - -#[cfg(test)] -mod test { - use super::*; - use option::*; - use rt::comm::*; - use rt::test::*; - use vec::*; - use comm::GenericChan; - use task; - use cell::Cell; - use iterator::{Iterator, range}; - - #[test] #[ignore(cfg(windows))] #[should_fail] - fn select_doesnt_get_trolled() { - select::<PortOne<()>>([]); - } - - /* non-blocking select tests */ - - #[cfg(test)] - fn select_helper(num_ports: uint, send_on_chans: &[uint]) { - // Unfortunately this does not actually test the block_on early-break - // codepath in select -- racing between the sender and the receiver in - // separate tasks is necessary to get around the optimistic check. - let (ports, chans) = unzip(from_fn(num_ports, |_| oneshot::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - - // Same thing with streams instead. - // FIXME(#7971): This should be in a macro but borrowck isn't smart enough. - let (ports, chans) = unzip(from_fn(num_ports, |_| stream::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - } - - #[test] - fn select_one() { - do run_in_newsched_task { select_helper(1, [0]) } - } - - #[test] - fn select_two() { - // NB. I would like to have a test that tests the first one that is - // ready is the one that's returned, but that can't be reliably tested - // with the randomized behaviour of optimistic_check. - do run_in_newsched_task { select_helper(2, [1]) } - do run_in_newsched_task { select_helper(2, [0]) } - do run_in_newsched_task { select_helper(2, [1,0]) } - } - - #[test] - fn select_a_lot() { - do run_in_newsched_task { select_helper(12, [7,8,9]) } - } - - #[test] - fn select_stream() { - use util; - use comm::GenericChan; - use iter::Times; - - // Sends 10 buffered packets, and uses select to retrieve them all. - // Puts the port in a different spot in the vector each time. - do run_in_newsched_task { - let (ports, _) = unzip(from_fn(10, |_| stream())); - let (port, chan) = stream(); - do 10.times { chan.send(31337); } - let mut ports = ports; - let mut port = Some(port); - let order = [5u,0,4,3,2,6,9,8,7,1]; - for &index in order.iter() { - // put the port in the vector at any index - util::swap(port.get_mut_ref(), &mut ports[index]); - assert!(select(ports) == index); - // get it back out - util::swap(port.get_mut_ref(), &mut ports[index]); - // NB. Not recv(), because optimistic_check randomly fails. - assert!(port.get_ref().recv_ready().unwrap() == 31337); - } - } - } - - #[test] - fn select_unkillable() { - do run_in_newsched_task { - do task::unkillable { select_helper(2, [1]) } - } - } - - /* blocking select tests */ - - #[test] - fn select_blocking() { - select_blocking_helper(true); - select_blocking_helper(false); - - fn select_blocking_helper(killable: bool) { - do run_in_newsched_task { - let (p1,_c) = oneshot(); - let (p2,c2) = oneshot(); - let mut ports = [p1,p2]; - - let (p3,c3) = oneshot(); - let (p4,c4) = oneshot(); - - let x = Cell::new((c2, p3, c4)); - do task::spawn { - let (c2, p3, c4) = x.take(); - p3.recv(); // handshake parent - c4.send(()); // normal receive - task::yield(); - c2.send(()); // select receive - } - - // Try to block before child sends on c2. - c3.send(()); - p4.recv(); - if killable { - assert!(select(ports) == 1); - } else { - do task::unkillable { assert!(select(ports) == 1); } - } - } - } - } - - #[test] - fn select_racing_senders() { - static NUM_CHANS: uint = 10; - - select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]); - select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]); - select_racing_senders_helper(true, ~[0,1,2]); - select_racing_senders_helper(false, ~[0,1,2]); - select_racing_senders_helper(true, ~[3,4,5,6]); - select_racing_senders_helper(false, ~[3,4,5,6]); - select_racing_senders_helper(true, ~[7,8,9]); - select_racing_senders_helper(false, ~[7,8,9]); - - fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) { - use rt::test::spawntask_random; - use iter::Times; - - do run_in_newsched_task { - // A bit of stress, since ordinarily this is just smoke and mirrors. - do 4.times { - let send_on_chans = send_on_chans.clone(); - do task::spawn { - let mut ports = ~[]; - for i in range(0u, NUM_CHANS) { - let (p,c) = oneshot(); - ports.push(p); - if send_on_chans.contains(&i) { - let c = Cell::new(c); - do spawntask_random { - task::yield(); - c.take().send(()); - } - } - } - // nondeterministic result, but should succeed - if killable { - select(ports); - } else { - do task::unkillable { select(ports); } - } - } - } - } - } - } - - #[test] #[ignore(cfg(windows))] - fn select_killed() { - do run_in_newsched_task { - let (success_p, success_c) = oneshot::<bool>(); - let success_c = Cell::new(success_c); - do task::try { - let success_c = Cell::new(success_c.take()); - do task::unkillable { - let (p,c) = oneshot(); - let c = Cell::new(c); - do task::spawn { - let (dead_ps, dead_cs) = unzip(from_fn(5, |_| oneshot::<()>())); - let mut ports = dead_ps; - select(ports); // should get killed; nothing should leak - c.take().send(()); // must not happen - // Make sure dead_cs doesn't get closed until after select. - let _ = dead_cs; - } - do task::spawn { - fail!(); // should kill sibling awake - } - - // wait for killed selector to close (NOT send on) its c. - // hope to send 'true'. - success_c.take().send(p.try_recv().is_none()); - } - }; - assert!(success_p.recv()); - } - } -} |
