diff options
| author | Brian Anderson <banderson@mozilla.com> | 2012-02-14 22:23:16 -0800 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2012-02-16 11:12:22 -0800 |
| commit | b2cfb7ef8262ebe47514f016f59054ebcfe15d61 (patch) | |
| tree | 871bc0980b26b11d700761cad9a8ac245f5fcedc /src/rt | |
| parent | e62ddf48988087d19934e1fdc6abb6de5f7a6a02 (diff) | |
| download | rust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.tar.gz rust-b2cfb7ef8262ebe47514f016f59054ebcfe15d61.zip | |
rt: Add rust_port_select function
Diffstat (limited to 'src/rt')
| -rw-r--r-- | src/rt/rust_builtin.cpp | 8 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 34 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_port_selector.cpp | 83 | ||||
| -rw-r--r-- | src/rt/rust_port_selector.h | 27 | ||||
| -rw-r--r-- | src/rt/rust_task.h | 5 | ||||
| -rw-r--r-- | src/rt/rustrt.def.in | 1 |
7 files changed, 151 insertions, 9 deletions
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 628389f10ce..ecc73204f29 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -594,6 +594,14 @@ port_recv(uintptr_t *dptr, rust_port *port, } extern "C" CDECL void +rust_port_select(rust_port **dptr, rust_port **ports, + size_t n_ports, uintptr_t *yield) { + rust_task *task = rust_task_thread::get_task(); + rust_port_selector *selector = task->get_port_selector(); + selector->select(task, dptr, ports, n_ports, yield); +} + +extern "C" CDECL void rust_set_exit_status(intptr_t code) { rust_task *task = rust_task_thread::get_task(); task->kernel->set_exit_status((int)code); diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index a917c12e151..5f46b9c4ca0 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -30,18 +30,34 @@ void rust_port::detach() { void rust_port::send(void *sptr) { I(task->thread, !lock.lock_held_by_current_thread()); - scoped_lock with(lock); + bool did_rendezvous = false; + { + scoped_lock with(lock); + + buffer.enqueue(sptr); - buffer.enqueue(sptr); + A(kernel, !buffer.is_empty(), + "rust_chan::transmit with nothing to send."); + + if (task->blocked_on(this)) { + KLOG(kernel, comm, "dequeued in rendezvous_ptr"); + buffer.dequeue(task->rendezvous_ptr); + task->rendezvous_ptr = 0; + task->wakeup(this); + did_rendezvous = true; + } + } - A(kernel, !buffer.is_empty(), - "rust_chan::transmit with nothing to send."); + if (!did_rendezvous) { + // If the task wasn't waiting specifically on this port, + // it may be waiting on a group of ports - if (task->blocked_on(this)) { - KLOG(kernel, comm, "dequeued in rendezvous_ptr"); - buffer.dequeue(task->rendezvous_ptr); - task->rendezvous_ptr = 0; - task->wakeup(this); + rust_port_selector *port_selector = task->get_port_selector(); + // This check is not definitive. The port selector will take a lock + // and check again whether the task is still blocked. + if (task->blocked_on(port_selector)) { + port_selector->msg_sent_on(this); + } } } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 44bd686650d..92ece8a7841 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -1,6 +1,8 @@ #ifndef RUST_PORT_H #define RUST_PORT_H +#include "rust_internal.h" + class rust_port : public kernel_owned<rust_port>, public rust_cond { public: RUST_REFCOUNTED(rust_port) diff --git a/src/rt/rust_port_selector.cpp b/src/rt/rust_port_selector.cpp new file mode 100644 index 00000000000..e9c351738f7 --- /dev/null +++ b/src/rt/rust_port_selector.cpp @@ -0,0 +1,83 @@ +#include "rust_port.h" +#include "rust_port_selector.h" + +rust_port_selector::rust_port_selector() + : ports(NULL), n_ports(0) { +} + +void +rust_port_selector::select(rust_task *task, rust_port **dptr, + rust_port **ports, + size_t n_ports, uintptr_t *yield) { + + I(task->thread, this->ports == NULL); + I(task->thread, this->n_ports == 0); + I(task->thread, dptr != NULL); + I(task->thread, ports != NULL); + I(task->thread, n_ports != 0); + I(task->thread, yield != NULL); + + *yield = false; + size_t locks_taken = 0; + bool found_msg = false; + + // Take each port's lock as we iterate through them because + // if none of them contain a usable message then we need to + // block the task before any of them can try to send another + // message. + + for (size_t i = 0; i < n_ports; i++) { + rust_port *port = ports[i]; + I(task->thread, port != NULL); + + port->lock.lock(); + locks_taken++; + + if (port->buffer.size() > 0) { + *dptr = port; + found_msg = true; + break; + } + } + + if (!found_msg) { + this->ports = ports; + this->n_ports = n_ports; + I(task->thread, task->rendezvous_ptr == NULL); + task->rendezvous_ptr = (uintptr_t*)dptr; + *yield = true; + task->block(this, "waiting for select rendezvous"); + } + + for (size_t i = 0; i < locks_taken; i++) { + rust_port *port = ports[i]; + port->lock.unlock(); + } +} + +void +rust_port_selector::msg_sent_on(rust_port *port) { + rust_task *task = port->task; + + I(task->thread, !task->lock.lock_held_by_current_thread()); + I(task->thread, !port->lock.lock_held_by_current_thread()); + I(task->thread, !rendezvous_lock.lock_held_by_current_thread()); + + // Prevent two ports from trying to wake up the task + // simultaneously + scoped_lock with(rendezvous_lock); + + if (task->blocked_on(this)) { + for (size_t i = 0; i < n_ports; i++) { + if (port == ports[i]) { + // This was one of the ports we were waiting on + ports = NULL; + n_ports = 0; + *task->rendezvous_ptr = (uintptr_t) port; + task->rendezvous_ptr = NULL; + task->wakeup(this); + return; + } + } + } +} diff --git a/src/rt/rust_port_selector.h b/src/rt/rust_port_selector.h new file mode 100644 index 00000000000..8b4d902a249 --- /dev/null +++ b/src/rt/rust_port_selector.h @@ -0,0 +1,27 @@ +#ifndef RUST_PORT_SELECTOR_H +#define RUST_PORT_SELECTOR_H + +#include "rust_internal.h" + +struct rust_task; +class rust_port; + +class rust_port_selector : public rust_cond { + private: + rust_port **ports; + size_t n_ports; + lock_and_signal rendezvous_lock; + + public: + rust_port_selector(); + + void select(rust_task *task, + rust_port **dptr, + rust_port **ports, + size_t n_ports, + uintptr_t *yield); + + void msg_sent_on(rust_port *port); +}; + +#endif /* RUST_PORT_SELECTOR_H */ diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 7594e677bb0..fe1b94d6ea5 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -16,6 +16,7 @@ #include "rust_obstack.h" #include "boxed_region.h" #include "rust_stack.h" +#include "rust_port_selector.h" // Corresponds to the rust chan (currently _chan) type. struct chan_handle { @@ -116,6 +117,8 @@ private: uintptr_t next_c_sp; uintptr_t next_rust_sp; + rust_port_selector port_selector; + // Called when the atomic refcount reaches zero void delete_this(); @@ -206,6 +209,8 @@ public: void call_on_c_stack(void *args, void *fn_ptr); void call_on_rust_stack(void *args, void *fn_ptr); bool have_c_stack() { return c_stack != NULL; } + + rust_port_selector *get_port_selector() { return &port_selector; } }; // This stuff is on the stack-switching fast path diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index bcdb2079d97..2030f320706 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -17,6 +17,7 @@ nano_time new_port new_task port_recv +rust_port_select rand_free rand_new rand_next |
