about summary refs log tree commit diff
path: root/src/rt
diff options
context:
space:
mode:
authorMichael Bebenita <mbebenita@mozilla.com>2010-08-09 08:15:34 -0700
committerMichael Bebenita <mbebenita@mozilla.com>2010-08-09 08:15:34 -0700
commit97d6342bf08e55f8d2b4f8df5c4b5a099df0191c (patch)
treebfed15fefbc032deba1c34908f25c1562d88aa6b /src/rt
parent5917ca35190b526b65b4d26ad0b98024ce9e0b09 (diff)
downloadrust-97d6342bf08e55f8d2b4f8df5c4b5a099df0191c.tar.gz
rust-97d6342bf08e55f8d2b4f8df5c4b5a099df0191c.zip
Synthesize a flush_chan upcall right before a channel's ref_count drops to zero. This should only happen in the Rust code and not in the drop glue, or on the unwind path. This change allows the task owning the channel to block on a flush and delete its own channel. This change also cleans up some code around rust_port and rust_chan.
Diffstat (limited to 'src/rt')
-rw-r--r--src/rt/rust_chan.cpp27
-rw-r--r--src/rt/rust_chan.h2
-rw-r--r--src/rt/rust_dom.cpp18
-rw-r--r--src/rt/rust_message.cpp3
-rw-r--r--src/rt/rust_port.cpp24
-rw-r--r--src/rt/rust_port.h1
-rw-r--r--src/rt/rust_task.cpp5
-rw-r--r--src/rt/rust_upcall.cpp68
8 files changed, 93 insertions, 55 deletions
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index f107d2871dd..2a0a61db7f1 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
 }
 
 rust_chan::~rust_chan() {
-    if (port && !port->is_proxy()) {
-        port->delegate()->chans.swap_delete(this);
-    }
+    task->log(rust_log::MEM | rust_log::COMM,
+              "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this);
+
+    A(task->dom, is_associated() == false,
+      "Channel must be disassociated before being freed.");
 }
 
 /**
@@ -28,7 +30,10 @@ rust_chan::~rust_chan() {
  */
 void rust_chan::associate(maybe_proxy<rust_port> *port) {
     this->port = port;
-    if (!port->is_proxy()) {
+    if (port->is_proxy() == false) {
+        task->log(rust_log::TASK,
+            "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
+            this, port);
         this->port->delegate()->chans.push(this);
     }
 }
@@ -43,14 +48,23 @@ bool rust_chan::is_associated() {
 void rust_chan::disassociate() {
     A(task->dom, is_associated(), "Channel must be associated with a port.");
 
+    if (port->is_proxy() == false) {
+        task->log(rust_log::TASK,
+            "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
+            this, port->delegate());
+        port->delegate()->chans.swap_delete(this);
+    }
+
     // Delete reference to the port.
     port = NULL;
 }
 
 /**
- * Attempt to transmit channel data to the associated port.
+ * Attempt to send data to the associated port.
  */
-void rust_chan::transmit() {
+void rust_chan::send(void *sptr) {
+    buffer.enqueue(sptr);
+
     rust_dom *dom = task->dom;
     if (!is_associated()) {
         W(dom, is_associated(),
@@ -81,7 +95,6 @@ void rust_chan::transmit() {
 
     return;
 }
-
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 055e359ae71..6aa9824786d 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -17,7 +17,7 @@ public:
     void disassociate();
     bool is_associated();
 
-    void transmit();
+    void send(void *sptr);
 };
 
 //
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index b66fca829fb..af1472520ba 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() {
         rust_task *task = dead_tasks[i];
         if (task->ref_count == 0) {
             I(this, task->tasks_waiting_to_join.is_empty());
-
             dead_tasks.swap_delete(task);
             log(rust_log::TASK,
                 "deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -392,10 +391,9 @@ rust_dom::start_main_loop()
         // if progress is made in other domains.
 
         if (scheduled_task == NULL) {
-            log(rust_log::TASK,
-                "all tasks are blocked, waiting for progress ...");
-            if (_log.is_tracing(rust_log::TASK))
+            if (_log.is_tracing(rust_log::TASK)) {
                 log_state();
+            }
             log(rust_log::TASK,
                 "all tasks are blocked, scheduler yielding ...");
             sync::yield();
@@ -437,18 +435,6 @@ rust_dom::start_main_loop()
     log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
 
     while (dead_tasks.length() > 0) {
-        log(rust_log::DOM,
-            "waiting for %d dead tasks to become dereferenced ...",
-            dead_tasks.length());
-
-        if (_log.is_tracing(rust_log::DOM)) {
-            for (size_t i = 0; i < dead_tasks.length(); i++) {
-                log(rust_log::DOM,
-                    "task: 0x%" PRIxPTR ", index: %d, ref_count: %d",
-                    dead_tasks[i], i, dead_tasks[i]->ref_count);
-            }
-        }
-
         if (_incoming_message_queue.is_empty()) {
             log(rust_log::DOM,
                 "waiting for %d dead tasks to become dereferenced, "
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index 8b396b4dc0f..1de804c9dec 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
 }
 
 void data_message::process() {
-    _port->remote_channel->buffer.enqueue(_buffer);
-    _port->remote_channel->transmit();
+    _port->remote_channel->send(_buffer);
     _target->log(rust_log::COMM, "<=== received data via message ===");
 }
 
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index bd9af6f9643..c97b5d4170a 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -21,14 +21,32 @@ rust_port::~rust_port() {
 
     // Disassociate channels from this port.
     while (chans.is_empty() == false) {
-        chans.pop()->disassociate();
+        rust_chan *chan = chans.peek();
+        chan->disassociate();
     }
 
-    // We're the only ones holding a reference to the remote channel, so
-    // clean it up.
     delete remote_channel;
 }
 
+bool rust_port::receive(void *dptr) {
+    for (uint32_t i = 0; i < chans.length(); i++) {
+        rust_chan *chan = chans[i];
+        if (chan->buffer.is_empty() == false) {
+            chan->buffer.dequeue(dptr);
+            if (chan->buffer.is_empty() && chan->task->blocked()) {
+                task->log(rust_log::COMM,
+                          "chan: 0x%" PRIxPTR
+                          " is flushing, wakeup task: 0x%" PRIxPTR,
+                          chan, chan->task);
+                chan->task->wakeup(this);
+            }
+            task->log(rust_log::COMM, "<=== read data ===");
+            return true;
+        }
+    }
+    return false;
+}
+
 void rust_port::log_state() {
     task->log(rust_log::COMM,
               "rust_port: 0x%" PRIxPTR ", associated channel(s): %d",
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 3330701ad73..7a58f839c44 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -16,6 +16,7 @@ public:
     rust_port(rust_task *task, size_t unit_sz);
     ~rust_port();
     void log_state();
+    bool receive(void *dptr);
 };
 
 //
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index c8831e8e05b..f52db868e02 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves)
 
 void
 rust_task::kill() {
+    if (dead()) {
+        // Task is already dead, can't kill what's already dead.
+        return;
+    }
+
     // Note the distinction here: kill() is when you're in an upcall
     // from task A and want to force-fail task B, you do B->kill().
     // If you want to fail yourself you do self->fail(upcall_nargs).
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 5040a1575a1..039aa2fd972 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) {
 }
 
 /**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+    LOG_UPCALL_ENTRY(task);
+    rust_dom *dom = task->dom;
+    task->log(rust_log::UPCALL | rust_log::COMM,
+              "flush chan: 0x%" PRIxPTR, chan);
+
+    if (chan->buffer.is_empty()) {
+        return;
+    }
+
+    A(dom, chan->port->is_proxy() == false,
+      "Channels to remote ports should be flushed automatically.");
+
+    // Block on the port until this channel has been completely drained
+    // by the port.
+    task->block(chan->port);
+    task->yield(2);
+}
+
+/**
  * Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
  */
 extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
-    rust_dom *dom = task->dom;
+
     task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
               "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
-    I(dom, !chan->ref_count);
-
-    if (!chan->buffer.is_empty() && chan->is_associated()) {
-        A(dom, !chan->port->is_proxy(),
-          "Channels to remote ports should be flushed automatically.");
-        // A target port may still be reading from this channel.
-        // Block on this channel until it has been completely drained
-        // by the port.
-        task->block(chan);
-        task->yield(2);
-        return;
-    }
 
+    A(task->dom, chan->ref_count == 0,
+      "Channel's ref count should be zero.");
+
+    if (chan->is_associated()) {
+        A(task->dom, chan->buffer.is_empty(),
+          "Channel's buffer should be empty.");
+        chan->disassociate();
+    }
     delete chan;
 }
 
@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
               "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
               (uintptr_t) chan, (uintptr_t) sptr,
               chan->port->delegate()->unit_sz);
-    chan->buffer.enqueue(sptr);
-    chan->transmit();
+    chan->send(sptr);
     task->log(rust_log::COMM, "=== sent data ===>");
 }
 
@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
               (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
               port->chans.length());
 
-    for (uint32_t i = 0; i < port->chans.length(); i++) {
-        rust_chan *chan = port->chans[i];
-        if (chan->buffer.is_empty() == false) {
-            chan->buffer.dequeue(dptr);
-            if (chan->buffer.is_empty() && chan->task->blocked()) {
-                chan->task->wakeup(chan);
-                delete chan;
-            }
-            task->log(rust_log::COMM, "<=== read data ===");
-            return;
-        }
+    if (port->receive(dptr)) {
+        return;
     }
 
     // No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
     LOG_UPCALL_ENTRY(task);
     task->log(rust_log::UPCALL | rust_log::TASK,
               "task ref_count: %d", task->ref_count);
+    A(task->dom, task->ref_count >= 0,
+      "Task ref_count should not be negative on exit!");
     task->die();
     task->notify_tasks_waiting_to_join();
     task->yield(1);