about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/boot/me/trans.ml20
-rw-r--r--src/rt/rust_chan.cpp27
-rw-r--r--src/rt/rust_chan.h2
-rw-r--r--src/rt/rust_dom.cpp18
-rw-r--r--src/rt/rust_message.cpp3
-rw-r--r--src/rt/rust_port.cpp24
-rw-r--r--src/rt/rust_port.h1
-rw-r--r--src/rt/rust_task.cpp5
-rw-r--r--src/rt/rust_upcall.cpp68
9 files changed, 113 insertions, 55 deletions
diff --git a/src/boot/me/trans.ml b/src/boot/me/trans.ml
index b708bb268ff..97dce2b2c58 100644
--- a/src/boot/me/trans.ml
+++ b/src/boot/me/trans.ml
@@ -2932,6 +2932,7 @@ let trans_visitor
       (slot:Ast.slot)
       (curr_iso:Ast.ty_iso option)
       : unit =
+      check_and_flush_chan cell slot;
       drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso
 
   and drop_ty_in_current_frame
@@ -4188,6 +4189,25 @@ let trans_visitor
     let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in
       Array.iter patch last_jumps
 
+  (* If we're about to drop a channel, synthesize an upcall_flush_chan.
+   * TODO: This should rather appear in a chan dtor when chans become
+   * objects. *)
+  and check_and_flush_chan
+    (cell:Il.cell)
+    (slot:Ast.slot)
+      : unit =
+      let ty = strip_mutable_or_constrained_ty (slot_ty slot) in
+      match simplified_ty ty with
+          Ast.TY_chan _ ->
+                annotate "check_and_flush_chan, flush_chan";
+                let rc = box_rc_cell cell in
+                  emit (Il.cmp (Il.Cell rc) one);
+                let jump = mark () in
+                  emit (Il.jmp Il.JNE Il.CodeNone);
+                  trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |];
+                  patch jump;
+        | _ -> ()
+
   and drop_slots_at_curr_stmt _ : unit =
     let stmt = Stack.top curr_stmt in
       match htab_search cx.ctxt_post_stmt_slot_drops stmt with
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index f107d2871dd..2a0a61db7f1 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
 }
 
 rust_chan::~rust_chan() {
-    if (port && !port->is_proxy()) {
-        port->delegate()->chans.swap_delete(this);
-    }
+    task->log(rust_log::MEM | rust_log::COMM,
+              "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this);
+
+    A(task->dom, is_associated() == false,
+      "Channel must be disassociated before being freed.");
 }
 
 /**
@@ -28,7 +30,10 @@ rust_chan::~rust_chan() {
  */
 void rust_chan::associate(maybe_proxy<rust_port> *port) {
     this->port = port;
-    if (!port->is_proxy()) {
+    if (port->is_proxy() == false) {
+        task->log(rust_log::TASK,
+            "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
+            this, port);
         this->port->delegate()->chans.push(this);
     }
 }
@@ -43,14 +48,23 @@ bool rust_chan::is_associated() {
 void rust_chan::disassociate() {
     A(task->dom, is_associated(), "Channel must be associated with a port.");
 
+    if (port->is_proxy() == false) {
+        task->log(rust_log::TASK,
+            "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
+            this, port->delegate());
+        port->delegate()->chans.swap_delete(this);
+    }
+
     // Delete reference to the port.
     port = NULL;
 }
 
 /**
- * Attempt to transmit channel data to the associated port.
+ * Attempt to send data to the associated port.
  */
-void rust_chan::transmit() {
+void rust_chan::send(void *sptr) {
+    buffer.enqueue(sptr);
+
     rust_dom *dom = task->dom;
     if (!is_associated()) {
         W(dom, is_associated(),
@@ -81,7 +95,6 @@ void rust_chan::transmit() {
 
     return;
 }
-
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 055e359ae71..6aa9824786d 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -17,7 +17,7 @@ public:
     void disassociate();
     bool is_associated();
 
-    void transmit();
+    void send(void *sptr);
 };
 
 //
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index b66fca829fb..af1472520ba 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() {
         rust_task *task = dead_tasks[i];
         if (task->ref_count == 0) {
             I(this, task->tasks_waiting_to_join.is_empty());
-
             dead_tasks.swap_delete(task);
             log(rust_log::TASK,
                 "deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -392,10 +391,9 @@ rust_dom::start_main_loop()
         // if progress is made in other domains.
 
         if (scheduled_task == NULL) {
-            log(rust_log::TASK,
-                "all tasks are blocked, waiting for progress ...");
-            if (_log.is_tracing(rust_log::TASK))
+            if (_log.is_tracing(rust_log::TASK)) {
                 log_state();
+            }
             log(rust_log::TASK,
                 "all tasks are blocked, scheduler yielding ...");
             sync::yield();
@@ -437,18 +435,6 @@ rust_dom::start_main_loop()
     log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
 
     while (dead_tasks.length() > 0) {
-        log(rust_log::DOM,
-            "waiting for %d dead tasks to become dereferenced ...",
-            dead_tasks.length());
-
-        if (_log.is_tracing(rust_log::DOM)) {
-            for (size_t i = 0; i < dead_tasks.length(); i++) {
-                log(rust_log::DOM,
-                    "task: 0x%" PRIxPTR ", index: %d, ref_count: %d",
-                    dead_tasks[i], i, dead_tasks[i]->ref_count);
-            }
-        }
-
         if (_incoming_message_queue.is_empty()) {
             log(rust_log::DOM,
                 "waiting for %d dead tasks to become dereferenced, "
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index 8b396b4dc0f..1de804c9dec 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
 }
 
 void data_message::process() {
-    _port->remote_channel->buffer.enqueue(_buffer);
-    _port->remote_channel->transmit();
+    _port->remote_channel->send(_buffer);
     _target->log(rust_log::COMM, "<=== received data via message ===");
 }
 
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index bd9af6f9643..c97b5d4170a 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -21,14 +21,32 @@ rust_port::~rust_port() {
 
     // Disassociate channels from this port.
     while (chans.is_empty() == false) {
-        chans.pop()->disassociate();
+        rust_chan *chan = chans.peek();
+        chan->disassociate();
     }
 
-    // We're the only ones holding a reference to the remote channel, so
-    // clean it up.
     delete remote_channel;
 }
 
+bool rust_port::receive(void *dptr) {
+    for (uint32_t i = 0; i < chans.length(); i++) {
+        rust_chan *chan = chans[i];
+        if (chan->buffer.is_empty() == false) {
+            chan->buffer.dequeue(dptr);
+            if (chan->buffer.is_empty() && chan->task->blocked()) {
+                task->log(rust_log::COMM,
+                          "chan: 0x%" PRIxPTR
+                          " is flushing, wakeup task: 0x%" PRIxPTR,
+                          chan, chan->task);
+                chan->task->wakeup(this);
+            }
+            task->log(rust_log::COMM, "<=== read data ===");
+            return true;
+        }
+    }
+    return false;
+}
+
 void rust_port::log_state() {
     task->log(rust_log::COMM,
               "rust_port: 0x%" PRIxPTR ", associated channel(s): %d",
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 3330701ad73..7a58f839c44 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -16,6 +16,7 @@ public:
     rust_port(rust_task *task, size_t unit_sz);
     ~rust_port();
     void log_state();
+    bool receive(void *dptr);
 };
 
 //
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index c8831e8e05b..f52db868e02 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves)
 
 void
 rust_task::kill() {
+    if (dead()) {
+        // Task is already dead, can't kill what's already dead.
+        return;
+    }
+
     // Note the distinction here: kill() is when you're in an upcall
     // from task A and want to force-fail task B, you do B->kill().
     // If you want to fail yourself you do self->fail(upcall_nargs).
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 5040a1575a1..039aa2fd972 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) {
 }
 
 /**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+    LOG_UPCALL_ENTRY(task);
+    rust_dom *dom = task->dom;
+    task->log(rust_log::UPCALL | rust_log::COMM,
+              "flush chan: 0x%" PRIxPTR, chan);
+
+    if (chan->buffer.is_empty()) {
+        return;
+    }
+
+    A(dom, chan->port->is_proxy() == false,
+      "Channels to remote ports should be flushed automatically.");
+
+    // Block on the port until this channel has been completely drained
+    // by the port.
+    task->block(chan->port);
+    task->yield(2);
+}
+
+/**
  * Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
  */
 extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
-    rust_dom *dom = task->dom;
+
     task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
               "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
-    I(dom, !chan->ref_count);
-
-    if (!chan->buffer.is_empty() && chan->is_associated()) {
-        A(dom, !chan->port->is_proxy(),
-          "Channels to remote ports should be flushed automatically.");
-        // A target port may still be reading from this channel.
-        // Block on this channel until it has been completely drained
-        // by the port.
-        task->block(chan);
-        task->yield(2);
-        return;
-    }
 
+    A(task->dom, chan->ref_count == 0,
+      "Channel's ref count should be zero.");
+
+    if (chan->is_associated()) {
+        A(task->dom, chan->buffer.is_empty(),
+          "Channel's buffer should be empty.");
+        chan->disassociate();
+    }
     delete chan;
 }
 
@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
               "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
               (uintptr_t) chan, (uintptr_t) sptr,
               chan->port->delegate()->unit_sz);
-    chan->buffer.enqueue(sptr);
-    chan->transmit();
+    chan->send(sptr);
     task->log(rust_log::COMM, "=== sent data ===>");
 }
 
@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
               (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
               port->chans.length());
 
-    for (uint32_t i = 0; i < port->chans.length(); i++) {
-        rust_chan *chan = port->chans[i];
-        if (chan->buffer.is_empty() == false) {
-            chan->buffer.dequeue(dptr);
-            if (chan->buffer.is_empty() && chan->task->blocked()) {
-                chan->task->wakeup(chan);
-                delete chan;
-            }
-            task->log(rust_log::COMM, "<=== read data ===");
-            return;
-        }
+    if (port->receive(dptr)) {
+        return;
     }
 
     // No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
     LOG_UPCALL_ENTRY(task);
     task->log(rust_log::UPCALL | rust_log::TASK,
               "task ref_count: %d", task->ref_count);
+    A(task->dom, task->ref_count >= 0,
+      "Task ref_count should not be negative on exit!");
     task->die();
     task->notify_tasks_waiting_to_join();
     task->yield(1);