// Copyright 2012 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. #include "rust_port.h" #include "rust_port_selector.h" #include "rust_task.h" rust_port_selector::rust_port_selector() : ports(NULL), n_ports(0) { } void rust_port_selector::select(rust_task *task, rust_port **dptr, rust_port **ports, size_t n_ports, uintptr_t *yield) { assert(this->ports == NULL); assert(this->n_ports == 0); assert(dptr != NULL); assert(ports != NULL); assert(n_ports != 0); assert(yield != NULL); *yield = false; size_t locks_taken = 0; bool found_msg = false; // Take each port's lock as we iterate through them because // if none of them contain a usable message then we need to // block the task before any of them can try to send another // message. // Start looking for ports from a different index each time. size_t j = isaac_rand(&task->sched_loop->rctx); for (size_t i = 0; i < n_ports; i++) { size_t k = (i + j) % n_ports; rust_port *port = ports[k]; assert(port != NULL); port->lock.lock(); locks_taken++; if (port->buffer.size() > 0) { *dptr = port; found_msg = true; break; } } if (!found_msg) { this->ports = ports; this->n_ports = n_ports; assert(task->rendezvous_ptr == NULL); task->rendezvous_ptr = (uintptr_t*)dptr; task->block(this, "waiting for select rendezvous"); // Blocking the task might fail if the task has already been // killed, but in the event of both failure and success the // task needs to yield. On success, it yields and waits to be // unblocked. On failure it yields and is then fails the task. *yield = true; } for (size_t i = 0; i < locks_taken; i++) { size_t k = (i + j) % n_ports; rust_port *port = ports[k]; port->lock.unlock(); } } void rust_port_selector::msg_sent_on(rust_port *port) { rust_task *task = port->task; port->lock.must_not_have_lock(); // Prevent two ports from trying to wake up the task // simultaneously scoped_lock with(task->lifecycle_lock); if (task->blocked_on(this)) { for (size_t i = 0; i < n_ports; i++) { if (port == ports[i]) { // This was one of the ports we were waiting on ports = NULL; n_ports = 0; *task->rendezvous_ptr = (uintptr_t) port; task->rendezvous_ptr = NULL; task->wakeup_inner(this); return; } } } }