1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
#include "rust_internal.h"
#include "rust_chan.h"
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_kernel *kernel, maybe_proxy<rust_port> *port,
size_t unit_sz)
: ref_count(1),
kernel(kernel),
port(port),
buffer(kernel, unit_sz) {
if (port) {
associate(port);
}
DLOG(kernel->sched, comm, "new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
(uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
}
rust_chan::~rust_chan() {
DLOG(kernel->sched, comm, "del rust_chan(task=0x%" PRIxPTR ")",
(uintptr_t) this);
A(kernel->sched, is_associated() == false,
"Channel must be disassociated before being freed.");
}
/**
* Link this channel with the specified port.
*/
void rust_chan::associate(maybe_proxy<rust_port> *port) {
this->port = port;
if (port->is_proxy() == false) {
DLOG(kernel->sched, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
++this->ref_count;
this->task = port->referent()->task;
this->task->ref();
this->port->referent()->chans.push(this);
}
}
bool rust_chan::is_associated() {
return port != NULL;
}
/**
* Unlink this channel from its associated port.
*/
void rust_chan::disassociate() {
A(kernel->sched, is_associated(),
"Channel must be associated with a port.");
if (port->is_proxy() == false) {
DLOG(kernel->sched, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port->referent());
--this->ref_count;
--this->task->ref_count;
this->task = NULL;
port->referent()->chans.swap_delete(this);
}
// Delete reference to the port.
port = NULL;
}
/**
* Attempt to send data to the associated port.
*/
void rust_chan::send(void *sptr) {
buffer.enqueue(sptr);
rust_scheduler *sched = kernel->sched;
if (!is_associated()) {
W(sched, is_associated(),
"rust_chan::transmit with no associated port.");
return;
}
A(sched, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (port->is_proxy()) {
data_message::send(buffer.peek(), buffer.unit_sz, "send data",
task->get_handle(), port->as_proxy()->handle());
buffer.dequeue(NULL);
} else {
rust_port *target_port = port->referent();
scoped_lock with(target_port->lock);
if (target_port->task->blocked_on(target_port)) {
DLOG(sched, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);
target_port->task->rendezvous_ptr = 0;
target_port->task->wakeup(target_port);
return;
}
}
return;
}
rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
size_t unit_sz = buffer.unit_sz;
maybe_proxy<rust_port> *port = this->port;
rust_task *target_task = NULL;
if (target->is_proxy() == false) {
port = this->port;
target_task = target->referent();
} else {
rust_handle<rust_port> *handle =
task->sched->kernel->get_port_handle(port->as_referent());
maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
DLOG(kernel->sched, mem, "new proxy: " PTR, proxy);
port = proxy;
target_task = target->as_proxy()->handle()->referent();
}
return new (target_task->kernel, "cloned chan")
rust_chan(kernel, port, unit_sz);
}
/**
* Cannot Yield: If the task were to unwind, the dropped ref would still
* appear to be live, causing modify-after-free errors.
*/
void rust_chan::destroy() {
A(kernel->sched, ref_count == 0,
"Channel's ref count should be zero.");
if (is_associated()) {
if (port->is_proxy()) {
// Here is a good place to delete the port proxy we allocated
// in upcall_clone_chan.
rust_proxy<rust_port> *proxy = port->as_proxy();
disassociate();
delete proxy;
} else {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
disassociate();
}
}
delete this;
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
|