about summary refs log tree commit diff
path: root/src/rt/rust_message.cpp
blob: f8001a17193b1d4f8377fba554a725c0971f74c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "rust_internal.h"
#include "rust_message.h"

rust_message::
rust_message(memory_region *region, const char* label,
             rust_handle<rust_task> *source, rust_handle<rust_task> *target) :
             label(label), region(region), _source(source), _target(target) {
}

rust_message::~rust_message() {
}

void rust_message::process() {
}

void rust_message::kernel_process() {
}

notify_message::
notify_message(memory_region *region, notification_type type,
    const char* label, rust_handle<rust_task> *source,
    rust_handle<rust_task> *target) :
    rust_message(region, label, source, target), type(type) {
}

data_message::
data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
             const char* label, rust_handle<rust_task> *source,
             rust_handle<rust_port> *port) :
             rust_message(region, label, source, NULL),
             _buffer_sz(buffer_sz), _port(port) {
    _buffer = (uint8_t *)malloc(buffer_sz);
    memcpy(_buffer, buffer, buffer_sz);
}

data_message::~data_message() {
    free (_buffer);
}

/**
 * Sends a message to the target task via a proxy. The message is allocated
 * in the target task domain along with a proxy which points back to the
 * source task.
 */
void notify_message::
send(notification_type type, const char* label,
     rust_handle<rust_task> *source, rust_handle<rust_task> *target) {
    memory_region *region = &target->message_queue->region;
    notify_message *message =
        new (region, "notify_message")
        notify_message(region, type, label, source, target);
    target->message_queue->enqueue(message);
}

void notify_message::process() {
    rust_task *task = _target->referent();
    switch (type) {
    case KILL:
        // task->ref_count--;
        task->kill();
        break;
    case JOIN: {
        if (task->dead() == false) {
            // FIXME: this should be dead code.
            assert(false);
        } else {
            send(WAKEUP, "wakeup", _target, _source);
        }
        break;
    }
    case WAKEUP:
        task->wakeup(_source);
        break;
    }
}

void notify_message::kernel_process() {
    switch(type) {
    case WAKEUP:
    case KILL:
        // Ignore.
        break;
    case JOIN:
        send(WAKEUP, "wakeup", _target, _source);
        break;
    }
}

void data_message::
send(uint8_t *buffer, size_t buffer_sz, const char* label,
     rust_handle<rust_task> *source, rust_handle<rust_port> *port) {

    memory_region *region = &port->message_queue->region;
    data_message *message =
        new (region, "data_message")
        data_message(region, buffer, buffer_sz, label, source, port);
    LOG(source->referent(), comm, "==> sending \"%s\"" PTR " in queue " PTR,
        label, message, &port->message_queue);
    port->message_queue->enqueue(message);
}

void data_message::process() {
    _port->referent()->remote_channel->send(_buffer);
}

void data_message::kernel_process() {

}

rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel)
    : region(srv, true),
      kernel(kernel),
      sched_handle(NULL) {
}

//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//