about summary refs log tree commit diff
path: root/src/rt
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2012-02-14 22:23:16 -0800
committerBrian Anderson <banderson@mozilla.com>2012-02-16 11:12:22 -0800
commitb2cfb7ef8262ebe47514f016f59054ebcfe15d61 (patch)
tree871bc0980b26b11d700761cad9a8ac245f5fcedc /src/rt
parente62ddf48988087d19934e1fdc6abb6de5f7a6a02 (diff)
downloadrust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.tar.gz
rust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.zip
rt: Add rust_port_select function
Diffstat (limited to 'src/rt')
-rw-r--r--src/rt/rust_builtin.cpp8
-rw-r--r--src/rt/rust_port.cpp34
-rw-r--r--src/rt/rust_port.h2
-rw-r--r--src/rt/rust_port_selector.cpp83
-rw-r--r--src/rt/rust_port_selector.h27
-rw-r--r--src/rt/rust_task.h5
-rw-r--r--src/rt/rustrt.def.in1
7 files changed, 151 insertions, 9 deletions
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 628389f10ce..ecc73204f29 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -594,6 +594,14 @@ port_recv(uintptr_t *dptr, rust_port *port,
 }
 
 extern "C" CDECL void
+rust_port_select(rust_port **dptr, rust_port **ports,
+                 size_t n_ports, uintptr_t *yield) {
+    rust_task *task = rust_task_thread::get_task();
+    rust_port_selector *selector = task->get_port_selector();
+    selector->select(task, dptr, ports, n_ports, yield);
+}
+
+extern "C" CDECL void
 rust_set_exit_status(intptr_t code) {
     rust_task *task = rust_task_thread::get_task();
     task->kernel->set_exit_status((int)code);
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index a917c12e151..5f46b9c4ca0 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -30,18 +30,34 @@ void rust_port::detach() {
 
 void rust_port::send(void *sptr) {
     I(task->thread, !lock.lock_held_by_current_thread());
-    scoped_lock with(lock);
+    bool did_rendezvous = false;
+    {
+        scoped_lock with(lock);
+
+        buffer.enqueue(sptr);
 
-    buffer.enqueue(sptr);
+        A(kernel, !buffer.is_empty(),
+          "rust_chan::transmit with nothing to send.");
+
+        if (task->blocked_on(this)) {
+            KLOG(kernel, comm, "dequeued in rendezvous_ptr");
+            buffer.dequeue(task->rendezvous_ptr);
+            task->rendezvous_ptr = 0;
+            task->wakeup(this);
+            did_rendezvous = true;
+        }
+    }
 
-    A(kernel, !buffer.is_empty(),
-      "rust_chan::transmit with nothing to send.");
+    if (!did_rendezvous) {
+        // If the task wasn't waiting specifically on this port,
+        // it may be waiting on a group of ports
 
-    if (task->blocked_on(this)) {
-        KLOG(kernel, comm, "dequeued in rendezvous_ptr");
-        buffer.dequeue(task->rendezvous_ptr);
-        task->rendezvous_ptr = 0;
-        task->wakeup(this);
+        rust_port_selector *port_selector = task->get_port_selector();
+        // This check is not definitive. The port selector will take a lock
+        // and check again whether the task is still blocked.
+        if (task->blocked_on(port_selector)) {
+            port_selector->msg_sent_on(this);
+        }
     }
 }
 
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 44bd686650d..92ece8a7841 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -1,6 +1,8 @@
 #ifndef RUST_PORT_H
 #define RUST_PORT_H
 
+#include "rust_internal.h"
+
 class rust_port : public kernel_owned<rust_port>, public rust_cond {
 public:
     RUST_REFCOUNTED(rust_port)
diff --git a/src/rt/rust_port_selector.cpp b/src/rt/rust_port_selector.cpp
new file mode 100644
index 00000000000..e9c351738f7
--- /dev/null
+++ b/src/rt/rust_port_selector.cpp
@@ -0,0 +1,83 @@
+#include "rust_port.h"
+#include "rust_port_selector.h"
+
+rust_port_selector::rust_port_selector()
+    : ports(NULL), n_ports(0) {
+}
+
+void
+rust_port_selector::select(rust_task *task, rust_port **dptr,
+			   rust_port **ports,
+			   size_t n_ports, uintptr_t *yield) {
+
+    I(task->thread, this->ports == NULL);
+    I(task->thread, this->n_ports == 0);
+    I(task->thread, dptr != NULL);
+    I(task->thread, ports != NULL);
+    I(task->thread, n_ports != 0);
+    I(task->thread, yield != NULL);
+
+    *yield = false;
+    size_t locks_taken = 0;
+    bool found_msg = false;
+
+    // Take each port's lock as we iterate through them because
+    // if none of them contain a usable message then we need to
+    // block the task before any of them can try to send another
+    // message.
+
+    for (size_t i = 0; i < n_ports; i++) {
+	rust_port *port = ports[i];
+	I(task->thread, port != NULL);
+
+	port->lock.lock();
+	locks_taken++;
+
+	if (port->buffer.size() > 0) {
+	    *dptr = port;
+	    found_msg = true;
+	    break;
+	}
+    }
+
+    if (!found_msg) {
+	this->ports = ports;
+	this->n_ports = n_ports;
+	I(task->thread, task->rendezvous_ptr == NULL);
+	task->rendezvous_ptr = (uintptr_t*)dptr;
+	*yield = true;
+	task->block(this, "waiting for select rendezvous");
+    }
+
+    for (size_t i = 0; i < locks_taken; i++) {
+	rust_port *port = ports[i];
+	port->lock.unlock();
+    }
+}
+
+void
+rust_port_selector::msg_sent_on(rust_port *port) {
+    rust_task *task = port->task;
+
+    I(task->thread, !task->lock.lock_held_by_current_thread());
+    I(task->thread, !port->lock.lock_held_by_current_thread());
+    I(task->thread, !rendezvous_lock.lock_held_by_current_thread());
+
+    // Prevent two ports from trying to wake up the task
+    // simultaneously
+    scoped_lock with(rendezvous_lock);
+
+    if (task->blocked_on(this)) {
+	for (size_t i = 0; i < n_ports; i++) {
+	    if (port == ports[i]) {
+		// This was one of the ports we were waiting on
+		ports = NULL;
+		n_ports = 0;
+		*task->rendezvous_ptr = (uintptr_t) port;
+		task->rendezvous_ptr = NULL;
+		task->wakeup(this);
+		return;
+	    }
+	}
+    }
+}
diff --git a/src/rt/rust_port_selector.h b/src/rt/rust_port_selector.h
new file mode 100644
index 00000000000..8b4d902a249
--- /dev/null
+++ b/src/rt/rust_port_selector.h
@@ -0,0 +1,27 @@
+#ifndef RUST_PORT_SELECTOR_H
+#define RUST_PORT_SELECTOR_H
+
+#include "rust_internal.h"
+
+struct rust_task;
+class rust_port;
+
+class rust_port_selector : public rust_cond {
+ private:
+    rust_port **ports;
+    size_t n_ports;
+    lock_and_signal rendezvous_lock;
+
+ public:
+    rust_port_selector();
+
+    void select(rust_task *task,
+		rust_port **dptr,
+		rust_port **ports,
+		size_t n_ports,
+		uintptr_t *yield);
+
+    void msg_sent_on(rust_port *port);
+};
+
+#endif /* RUST_PORT_SELECTOR_H */
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index 7594e677bb0..fe1b94d6ea5 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -16,6 +16,7 @@
 #include "rust_obstack.h"
 #include "boxed_region.h"
 #include "rust_stack.h"
+#include "rust_port_selector.h"
 
 // Corresponds to the rust chan (currently _chan) type.
 struct chan_handle {
@@ -116,6 +117,8 @@ private:
     uintptr_t next_c_sp;
     uintptr_t next_rust_sp;
 
+    rust_port_selector port_selector;
+
     // Called when the atomic refcount reaches zero
     void delete_this();
 
@@ -206,6 +209,8 @@ public:
     void call_on_c_stack(void *args, void *fn_ptr);
     void call_on_rust_stack(void *args, void *fn_ptr);
     bool have_c_stack() { return c_stack != NULL; }
+
+    rust_port_selector *get_port_selector() { return &port_selector; }
 };
 
 // This stuff is on the stack-switching fast path
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index bcdb2079d97..2030f320706 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -17,6 +17,7 @@ nano_time
 new_port
 new_task
 port_recv
+rust_port_select
 rand_free
 rand_new
 rand_next