about summary refs log tree commit diff
path: root/src/rt/rust_message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rt/rust_message.cpp')
-rw-r--r--src/rt/rust_message.cpp102
1 files changed, 63 insertions, 39 deletions
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index b6b7fbf07e0..dab13c091e2 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -2,36 +2,35 @@
 #include "rust_message.h"
 
 rust_message::
-rust_message(const char* label, rust_task *source, rust_task *target) :
-             label(label),
-             _dom(target->dom),
-             _source(source),
-             _target(target) {
+rust_message(memory_region *region, const char* label,
+             rust_handle<rust_task> *source, rust_handle<rust_task> *target) :
+             label(label), region(region), _source(source), _target(target) {
 }
 
 rust_message::~rust_message() {
+    // Nop.
 }
 
 void rust_message::process() {
-    I(_dom, false);
+    // Nop.
 }
 
-rust_proxy<rust_task> *
-rust_message::get_source_proxy() {
-    return _dom->get_task_proxy(_source);
+void rust_message::kernel_process() {
+    // Nop.
 }
 
 notify_message::
-notify_message(notification_type type, const char* label,
-               rust_task *source,
-               rust_task *target) :
-               rust_message(label, source, target), type(type) {
+notify_message(memory_region *region, notification_type type,
+    const char* label, rust_handle<rust_task> *source,
+    rust_handle<rust_task> *target) :
+    rust_message(region, label, source, target), type(type) {
 }
 
 data_message::
-data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
-             rust_task *source, rust_task *target, rust_port *port) :
-             rust_message(label, source, target),
+data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
+             const char* label, rust_handle<rust_task> *source,
+             rust_handle<rust_port> *port) :
+             rust_message(region, label, source, NULL),
              _buffer_sz(buffer_sz), _port(port) {
     _buffer = (uint8_t *)malloc(buffer_sz);
     memcpy(_buffer, buffer, buffer_sz);
@@ -47,54 +46,79 @@ data_message::~data_message() {
  * source task.
  */
 void notify_message::
-send(notification_type type, const char* label, rust_task *source,
-     rust_proxy<rust_task> *target) {
-    rust_task *target_task = target->delegate();
-    rust_dom *target_domain = target_task->dom;
+send(notification_type type, const char* label,
+     rust_handle<rust_task> *source, rust_handle<rust_task> *target) {
+    memory_region *region = &target->message_queue->region;
     notify_message *message =
-        new (target_domain, memory_region::SYNCHRONIZED) notify_message(type,
-            label, source, target_task);
-    target_domain->send_message(message);
+        new (region) notify_message(region, type, label, source, target);
+//    target->referent()->log(rust_log::COMM,
+//                            "==> sending \"%s\" " PTR " in queue " PTR,
+//                            label, message, &target->message_queue);
+    target->message_queue->enqueue(message);
 }
 
 void notify_message::process() {
-    rust_task *task = _target;
+    rust_task *task = _target->referent();
     switch (type) {
     case KILL:
-        task->ref_count--;
+        // task->ref_count--;
         task->kill();
         break;
     case JOIN: {
         if (task->dead() == false) {
-            task->tasks_waiting_to_join.append(get_source_proxy());
+            rust_proxy<rust_task> *proxy = new rust_proxy<rust_task>(_source);
+            task->tasks_waiting_to_join.append(proxy);
         } else {
-            send(WAKEUP, "wakeup", task, get_source_proxy());
+            send(WAKEUP, "wakeup", _target, _source);
         }
         break;
     }
     case WAKEUP:
-        task->wakeup(get_source_proxy()->delegate());
+        task->wakeup(_source);
+        break;
+    }
+}
+
+void notify_message::kernel_process() {
+    switch(type) {
+    case WAKEUP:
+    case KILL:
+        // Ignore.
+        break;
+    case JOIN:
+        send(WAKEUP, "wakeup", _target, _source);
         break;
     }
 }
 
 void data_message::
-send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
-     rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) {
+send(uint8_t *buffer, size_t buffer_sz, const char* label,
+     rust_handle<rust_task> *source, rust_handle<rust_port> *port) {
 
-    rust_task *target_task = target->delegate();
-    rust_port *target_port = port->delegate();
-    rust_dom *target_domain = target_task->dom;
+    memory_region *region = &port->message_queue->region;
     data_message *message =
-        new (target_domain, memory_region::SYNCHRONIZED)
-            data_message(buffer, buffer_sz, label, source,
-                target_task, target_port);
-    target_domain->send_message(message);
+        new (region) data_message(region, buffer, buffer_sz, label, source,
+                                  port);
+    source->referent()->log(rust_log::COMM,
+                            "==> sending \"%s\"" PTR " in queue " PTR,
+                            label, message, &port->message_queue);
+    port->message_queue->enqueue(message);
 }
 
 void data_message::process() {
-    _port->remote_channel->send(_buffer);
-    _target->log(rust_log::COMM, "<=== received data via message ===");
+    _port->referent()->remote_channel->send(_buffer);
+    // _target->referent()->log(rust_log::COMM,
+    //                         "<=== received data via message ===");
+}
+
+void data_message::kernel_process() {
+
+}
+
+rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel) :
+                                       region (srv, true), kernel(kernel),
+                                       dom_handle(NULL) {
+    // Nop.
 }
 
 //