about summary refs log tree commit diff
path: root/src/rt/rust_port_selector.cpp
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2012-02-14 22:23:16 -0800
committerBrian Anderson <banderson@mozilla.com>2012-02-16 11:12:22 -0800
commitb2cfb7ef8262ebe47514f016f59054ebcfe15d61 (patch)
tree871bc0980b26b11d700761cad9a8ac245f5fcedc /src/rt/rust_port_selector.cpp
parente62ddf48988087d19934e1fdc6abb6de5f7a6a02 (diff)
downloadrust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.tar.gz
rust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.zip
rt: Add rust_port_select function
Diffstat (limited to 'src/rt/rust_port_selector.cpp')
-rw-r--r--src/rt/rust_port_selector.cpp83
1 files changed, 83 insertions, 0 deletions
diff --git a/src/rt/rust_port_selector.cpp b/src/rt/rust_port_selector.cpp
new file mode 100644
index 00000000000..e9c351738f7
--- /dev/null
+++ b/src/rt/rust_port_selector.cpp
@@ -0,0 +1,83 @@
+#include "rust_port.h"
+#include "rust_port_selector.h"
+
+rust_port_selector::rust_port_selector()
+    : ports(NULL), n_ports(0) {
+}
+
+void
+rust_port_selector::select(rust_task *task, rust_port **dptr,
+			   rust_port **ports,
+			   size_t n_ports, uintptr_t *yield) {
+
+    I(task->thread, this->ports == NULL);
+    I(task->thread, this->n_ports == 0);
+    I(task->thread, dptr != NULL);
+    I(task->thread, ports != NULL);
+    I(task->thread, n_ports != 0);
+    I(task->thread, yield != NULL);
+
+    *yield = false;
+    size_t locks_taken = 0;
+    bool found_msg = false;
+
+    // Take each port's lock as we iterate through them because
+    // if none of them contain a usable message then we need to
+    // block the task before any of them can try to send another
+    // message.
+
+    for (size_t i = 0; i < n_ports; i++) {
+	rust_port *port = ports[i];
+	I(task->thread, port != NULL);
+
+	port->lock.lock();
+	locks_taken++;
+
+	if (port->buffer.size() > 0) {
+	    *dptr = port;
+	    found_msg = true;
+	    break;
+	}
+    }
+
+    if (!found_msg) {
+	this->ports = ports;
+	this->n_ports = n_ports;
+	I(task->thread, task->rendezvous_ptr == NULL);
+	task->rendezvous_ptr = (uintptr_t*)dptr;
+	*yield = true;
+	task->block(this, "waiting for select rendezvous");
+    }
+
+    for (size_t i = 0; i < locks_taken; i++) {
+	rust_port *port = ports[i];
+	port->lock.unlock();
+    }
+}
+
+void
+rust_port_selector::msg_sent_on(rust_port *port) {
+    rust_task *task = port->task;
+
+    I(task->thread, !task->lock.lock_held_by_current_thread());
+    I(task->thread, !port->lock.lock_held_by_current_thread());
+    I(task->thread, !rendezvous_lock.lock_held_by_current_thread());
+
+    // Prevent two ports from trying to wake up the task
+    // simultaneously
+    scoped_lock with(rendezvous_lock);
+
+    if (task->blocked_on(this)) {
+	for (size_t i = 0; i < n_ports; i++) {
+	    if (port == ports[i]) {
+		// This was one of the ports we were waiting on
+		ports = NULL;
+		n_ports = 0;
+		*task->rendezvous_ptr = (uintptr_t) port;
+		task->rendezvous_ptr = NULL;
+		task->wakeup(this);
+		return;
+	    }
+	}
+    }
+}