about summary refs log tree commit diff
path: root/src/rt/rust_upcall.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rt/rust_upcall.cpp')
-rw-r--r--src/rt/rust_upcall.cpp68
1 files changed, 42 insertions, 26 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 5040a1575a1..039aa2fd972 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) {
 }
 
 /**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+    LOG_UPCALL_ENTRY(task);
+    rust_dom *dom = task->dom;
+    task->log(rust_log::UPCALL | rust_log::COMM,
+              "flush chan: 0x%" PRIxPTR, chan);
+
+    if (chan->buffer.is_empty()) {
+        return;
+    }
+
+    A(dom, chan->port->is_proxy() == false,
+      "Channels to remote ports should be flushed automatically.");
+
+    // Block on the port until this channel has been completely drained
+    // by the port.
+    task->block(chan->port);
+    task->yield(2);
+}
+
+/**
  * Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
  */
 extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
-    rust_dom *dom = task->dom;
+
     task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
               "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
-    I(dom, !chan->ref_count);
-
-    if (!chan->buffer.is_empty() && chan->is_associated()) {
-        A(dom, !chan->port->is_proxy(),
-          "Channels to remote ports should be flushed automatically.");
-        // A target port may still be reading from this channel.
-        // Block on this channel until it has been completely drained
-        // by the port.
-        task->block(chan);
-        task->yield(2);
-        return;
-    }
 
+    A(task->dom, chan->ref_count == 0,
+      "Channel's ref count should be zero.");
+
+    if (chan->is_associated()) {
+        A(task->dom, chan->buffer.is_empty(),
+          "Channel's buffer should be empty.");
+        chan->disassociate();
+    }
     delete chan;
 }
 
@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
               "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
               (uintptr_t) chan, (uintptr_t) sptr,
               chan->port->delegate()->unit_sz);
-    chan->buffer.enqueue(sptr);
-    chan->transmit();
+    chan->send(sptr);
     task->log(rust_log::COMM, "=== sent data ===>");
 }
 
@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
               (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
               port->chans.length());
 
-    for (uint32_t i = 0; i < port->chans.length(); i++) {
-        rust_chan *chan = port->chans[i];
-        if (chan->buffer.is_empty() == false) {
-            chan->buffer.dequeue(dptr);
-            if (chan->buffer.is_empty() && chan->task->blocked()) {
-                chan->task->wakeup(chan);
-                delete chan;
-            }
-            task->log(rust_log::COMM, "<=== read data ===");
-            return;
-        }
+    if (port->receive(dptr)) {
+        return;
     }
 
     // No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
     LOG_UPCALL_ENTRY(task);
     task->log(rust_log::UPCALL | rust_log::TASK,
               "task ref_count: %d", task->ref_count);
+    A(task->dom, task->ref_count >= 0,
+      "Task ref_count should not be negative on exit!");
     task->die();
     task->notify_tasks_waiting_to_join();
     task->yield(1);