diff options
Diffstat (limited to 'src/rt/rust_upcall.cpp')
| -rw-r--r-- | src/rt/rust_upcall.cpp | 68 |
1 files changed, 42 insertions, 26 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 5040a1575a1..039aa2fd972 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) { } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -260,6 +274,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); |
