about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/comm.rs166
-rw-r--r--src/libstd/rt/kill.rs33
-rw-r--r--src/libstd/rt/mod.rs3
-rw-r--r--src/libstd/rt/select.rs102
4 files changed, 269 insertions, 35 deletions
diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs
index b1533237b15..87bf5a23b93 100644
--- a/src/libstd/rt/comm.rs
+++ b/src/libstd/rt/comm.rs
@@ -12,13 +12,13 @@
 
 use option::*;
 use cast;
-use util;
 use ops::Drop;
 use rt::kill::BlockedTask;
 use kinds::Send;
 use rt::sched::Scheduler;
 use rt::local::Local;
-use unstable::atomics::{AtomicUint, AtomicOption, Acquire, SeqCst};
+use rt::select::{Select, SelectPort};
+use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst};
 use unstable::sync::UnsafeAtomicRcBox;
 use util::Void;
 use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@@ -76,6 +76,7 @@ pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) {
 }
 
 impl<T> ChanOne<T> {
+    #[inline]
     fn packet(&self) -> *mut Packet<T> {
         unsafe {
             let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
@@ -141,7 +142,6 @@ impl<T> ChanOne<T> {
     }
 }
 
-
 impl<T> PortOne<T> {
     fn packet(&self) -> *mut Packet<T> {
         unsafe {
@@ -162,46 +162,115 @@ impl<T> PortOne<T> {
 
     pub fn try_recv(self) -> Option<T> {
         let mut this = self;
-        let packet = this.packet();
 
         // Optimistic check. If data was sent already, we don't even need to block.
         // No release barrier needed here; we're not handing off our task pointer yet.
-        if unsafe { (*packet).state.load(Acquire) } != STATE_ONE {
+        if !this.optimistic_check() {
             // No data available yet.
             // Switch to the scheduler to put the ~Task into the Packet state.
             let sched = Local::take::<Scheduler>();
             do sched.deschedule_running_task_and_then |sched, task| {
-                unsafe {
-                    // Atomically swap the task pointer into the Packet state, issuing
-                    // an acquire barrier to prevent reordering of the subsequent read
-                    // of the payload. Also issues a release barrier to prevent
-                    // reordering of any previous writes to the task structure.
-                    let task_as_state = task.cast_to_uint();
-                    let oldstate = (*packet).state.swap(task_as_state, SeqCst);
-                    match oldstate {
-                        STATE_BOTH => {
-                            // Data has not been sent. Now we're blocked.
-                            rtdebug!("non-rendezvous recv");
-                            sched.metrics.non_rendezvous_recvs += 1;
-                        }
-                        STATE_ONE => {
-                            rtdebug!("rendezvous recv");
-                            sched.metrics.rendezvous_recvs += 1;
-
-                            // Channel is closed. Switch back and check the data.
-                            // NB: We have to drop back into the scheduler event loop here
-                            // instead of switching immediately back or we could end up
-                            // triggering infinite recursion on the scheduler's stack.
-                            let recvr = BlockedTask::cast_from_uint(task_as_state);
-                            sched.enqueue_blocked_task(recvr);
+                this.block_on(sched, task);
+            }
+        }
+
+        // Task resumes.
+        this.recv_ready()
+    }
+}
+
+impl<T> Select for PortOne<T> {
+    #[inline]
+    fn optimistic_check(&mut self) -> bool {
+        unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE }
+    }
+
+    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
+        unsafe {
+            // Atomically swap the task pointer into the Packet state, issuing
+            // an acquire barrier to prevent reordering of the subsequent read
+            // of the payload. Also issues a release barrier to prevent
+            // reordering of any previous writes to the task structure.
+            let task_as_state = task.cast_to_uint();
+            let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst);
+            match oldstate {
+                STATE_BOTH => {
+                    // Data has not been sent. Now we're blocked.
+                    rtdebug!("non-rendezvous recv");
+                    sched.metrics.non_rendezvous_recvs += 1;
+                    false
+                }
+                STATE_ONE => {
+                    // Re-record that we are the only owner of the packet.
+                    // Release barrier needed in case the task gets reawoken
+                    // on a different core (this is analogous to writing a
+                    // payload; a barrier in enqueueing the task protects it).
+                    // NB(#8132). This *must* occur before the enqueue below.
+                    // FIXME(#6842, #8130) This is usually only needed for the
+                    // assertion in recv_ready, except in the case of select().
+                    // This won't actually ever have cacheline contention, but
+                    // maybe should be optimized out with a cfg(test) anyway?
+                    (*self.packet()).state.store(STATE_ONE, Release);
+
+                    rtdebug!("rendezvous recv");
+                    sched.metrics.rendezvous_recvs += 1;
+
+                    // Channel is closed. Switch back and check the data.
+                    // NB: We have to drop back into the scheduler event loop here
+                    // instead of switching immediately back or we could end up
+                    // triggering infinite recursion on the scheduler's stack.
+                    let recvr = BlockedTask::cast_from_uint(task_as_state);
+                    sched.enqueue_blocked_task(recvr);
+                    true
+                }
+                _ => rtabort!("can't block_on; a task is already blocked")
+            }
+        }
+    }
+
+    // This is the only select trait function that's not also used in recv.
+    fn unblock_from(&mut self) -> bool {
+        let packet = self.packet();
+        unsafe {
+            // In case the data is available, the acquire barrier here matches
+            // the release barrier the sender used to release the payload.
+            match (*packet).state.load(Acquire) {
+                // Impossible. We removed STATE_BOTH when blocking on it, and
+                // no self-respecting sender would put it back.
+                STATE_BOTH    => rtabort!("refcount already 2 in unblock_from"),
+                // Here, a sender already tried to wake us up. Perhaps they
+                // even succeeded! Data is available.
+                STATE_ONE     => true,
+                // Still registered as blocked. Need to "unblock" the pointer.
+                task_as_state => {
+                    // In the window between the load and the CAS, a sender
+                    // might take the pointer and set the refcount to ONE. If
+                    // that happens, we shouldn't clobber that with BOTH!
+                    // Acquire barrier again for the same reason as above.
+                    match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH,
+                                                           Acquire) {
+                        STATE_BOTH => rtabort!("refcount became 2 in unblock_from"),
+                        STATE_ONE  => true, // Lost the race. Data available.
+                        same_ptr   => {
+                            // We successfully unblocked our task pointer.
+                            assert!(task_as_state == same_ptr);
+                            let handle = BlockedTask::cast_from_uint(task_as_state);
+                            // Because we are already awake, the handle we
+                            // gave to this port shall already be empty.
+                            handle.assert_already_awake();
+                            false
                         }
-                        _ => util::unreachable()
                     }
                 }
             }
         }
+    }
+}
 
-        // Task resumes.
+impl<T> SelectPort<T> for PortOne<T> {
+    fn recv_ready(self) -> Option<T> {
+        let mut this = self;
+        let packet = this.packet();
 
         // No further memory barrier is needed here to access the
         // payload. Some scenarios:
@@ -213,8 +282,11 @@ impl<T> PortOne<T> {
         // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
         //    is pinned to some other scheduler, so the sending task had to give us to
         //    a different scheduler for resuming. That send synchronized memory.
-
         unsafe {
+            // See corresponding store() above in block_on for rationale.
+            // FIXME(#8130) This can happen only in test builds.
+            assert!((*packet).state.load(Acquire) == STATE_ONE);
+
             let payload = (*packet).payload.take();
 
             // The sender has closed up shop. Drop the packet.
@@ -234,7 +306,7 @@ impl<T> Peekable<T> for PortOne<T> {
             match oldstate {
                 STATE_BOTH => false,
                 STATE_ONE => (*packet).payload.is_some(),
-                _ => util::unreachable()
+                _ => rtabort!("peeked on a blocked task")
             }
         }
     }
@@ -368,6 +440,36 @@ impl<T> Peekable<T> for Port<T> {
     }
 }
 
+impl<T> Select for Port<T> {
+    #[inline]
+    fn optimistic_check(&mut self) -> bool {
+        do self.next.with_mut_ref |pone| { pone.optimistic_check() }
+    }
+
+    #[inline]
+    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
+        let task = Cell::new(task);
+        do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) }
+    }
+
+    #[inline]
+    fn unblock_from(&mut self) -> bool {
+        do self.next.with_mut_ref |pone| { pone.unblock_from() }
+    }
+}
+
+impl<T> SelectPort<(T, Port<T>)> for Port<T> {
+    fn recv_ready(self) -> Option<(T, Port<T>)> {
+        match self.next.take().recv_ready() {
+            Some(StreamPayload { val, next }) => {
+                self.next.put_back(next);
+                Some((val, self))
+            }
+            None => None
+        }
+    }
+}
+
 pub struct SharedChan<T> {
     // Just like Chan, but a shared AtomicOption instead of Cell
     priv next: UnsafeAtomicRcBox<AtomicOption<StreamChanOne<T>>>
diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs
index 2bf4543df50..f7f11a402b8 100644
--- a/src/libstd/rt/kill.rs
+++ b/src/libstd/rt/kill.rs
@@ -106,8 +106,14 @@ impl Drop for KillFlag {
 // blocked task handle. So unblocking a task must restore that spare.
 unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option<KillFlagHandle>) -> ~Task {
     let mut task: ~Task = cast::transmute(task_ptr);
-    rtassert!(task.death.spare_kill_flag.is_none());
-    task.death.spare_kill_flag = spare_flag;
+    if task.death.spare_kill_flag.is_none() {
+        task.death.spare_kill_flag = spare_flag;
+    } else {
+        // A task's spare kill flag is not used for blocking in one case:
+        // when an unkillable task blocks on select. In this case, a separate
+        // one was created, which we now discard.
+        rtassert!(task.death.unkillable > 0);
+    }
     task
 }
 
@@ -119,7 +125,7 @@ impl BlockedTask {
             Killable(flag_arc) => {
                 let flag = unsafe { &mut **flag_arc.get() };
                 match flag.swap(KILL_RUNNING, SeqCst) {
-                    KILL_RUNNING => rtabort!("tried to wake an already-running task"),
+                    KILL_RUNNING => None, // woken from select(), perhaps
                     KILL_KILLED  => None, // a killer stole it already
                     task_ptr     =>
                         Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) })
@@ -162,6 +168,27 @@ impl BlockedTask {
         }
     }
 
+    /// Converts one blocked task handle to a list of many handles to the same.
+    pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
+        let handles = match self {
+            Unkillable(task) => {
+                let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) };
+                UnsafeAtomicRcBox::newN(flag, num_handles)
+            }
+            Killable(flag_arc) => flag_arc.cloneN(num_handles),
+        };
+        // Even if the task was unkillable before, we use 'Killable' because
+        // multiple pipes will have handles. It does not really mean killable.
+        handles.consume_iter().transform(|x| Killable(x)).collect()
+    }
+
+    // This assertion has two flavours because the wake involves an atomic op.
+    // In the faster version, destructors will fail dramatically instead.
+    #[inline] #[cfg(not(test))]
+    pub fn assert_already_awake(self) { }
+    #[inline] #[cfg(test)]
+    pub fn assert_already_awake(self) { assert!(self.wake().is_none()); }
+
     /// Convert to an unsafe uint value. Useful for storing in a pipe's state flag.
     #[inline]
     pub unsafe fn cast_to_uint(self) -> uint {
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 808d07ce77d..2ca7d01da49 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -142,6 +142,9 @@ pub mod tube;
 /// Simple reimplementation of core::comm
 pub mod comm;
 
+/// Routines for select()ing on pipes.
+pub mod select;
+
 // FIXME #5248 shouldn't be pub
 /// The runtime needs to be able to put a pointer into thread-local storage.
 pub mod local_ptr;
diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs
new file mode 100644
index 00000000000..130084fd1fc
--- /dev/null
+++ b/src/libstd/rt/select.rs
@@ -0,0 +1,102 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::*;
+// use either::{Either, Left, Right};
+use rt::kill::BlockedTask;
+use rt::sched::Scheduler;
+use rt::local::Local;
+
+/// Trait for message-passing primitives that can be select()ed on.
+pub trait Select {
+    // Returns true if data was available.
+    fn optimistic_check(&mut self) -> bool;
+    // Returns true if data was available. If so, shall also wake() the task.
+    fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool;
+    // Returns true if data was available.
+    fn unblock_from(&mut self) -> bool;
+}
+
+/// Trait for message-passing primitives that can use the select2() convenience wrapper.
+// (This is separate from the above trait to enable heterogeneous lists of ports
+// that implement Select on different types to use select().)
+pub trait SelectPort<T> : Select {
+    fn recv_ready(self) -> Option<T>;
+}
+
+/// Receive a message from any one of many ports at once.
+pub fn select<A: Select>(ports: &mut [A]) -> uint {
+    if ports.is_empty() {
+        fail!("can't select on an empty list");
+    }
+
+    for ports.mut_iter().enumerate().advance |(index, port)| {
+        if port.optimistic_check() {
+            return index;
+        }
+    }
+
+    // If one of the ports already contains data when we go to block on it, we
+    // don't bother enqueueing on the rest of them, so we shouldn't bother
+    // unblocking from it either. This is just for efficiency, not correctness.
+    // (If not, we need to unblock from all of them. Length is a placeholder.)
+    let mut ready_index = ports.len();
+
+    let sched = Local::take::<Scheduler>();
+    do sched.deschedule_running_task_and_then |sched, task| {
+        let task_handles = task.make_selectable(ports.len());
+
+        for ports.mut_iter().zip(task_handles.consume_iter()).enumerate().advance
+                |(index, (port, task_handle))| {
+            // If one of the ports has data by now, it will wake the handle.
+            if port.block_on(sched, task_handle) {
+                ready_index = index;
+                break;
+            }
+        }
+    }
+
+    // Task resumes. Now unblock ourselves from all the ports we blocked on.
+    // If the success index wasn't reset, 'take' will just take all of them.
+    // Iterate in reverse so the 'earliest' index that's ready gets returned.
+    for ports.mut_slice(0, ready_index).mut_rev_iter().enumerate().advance |(index, port)| {
+        if port.unblock_from() {
+            ready_index = index;
+        }
+    }
+
+    assert!(ready_index < ports.len());
+    return ready_index;
+}
+
+/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet.
+
+impl <'self> Select for &'self mut Select {
+    fn optimistic_check(&mut self) -> bool { self.optimistic_check() }
+    fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool {
+        self.block_on(sched, task)
+    }
+    fn unblock_from(&mut self) -> bool { self.unblock_from() }
+}
+
+pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B)
+        -> Either<(Option<TA>, B), (A, Option<TB>)> {
+    let result = {
+        let mut ports = [&mut a as &mut Select, &mut b as &mut Select];
+        select(ports)
+    };
+    match result {
+        0 => Left ((a.recv_ready(), b)),
+        1 => Right((a, b.recv_ready())),
+        x => fail!("impossible case in select2: %?", x)
+    }
+}
+
+*/