1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
#include "rust_internal.h"
#include "rust_chan.h"
/**
* Create a new rust channel and associate it with the specified port.
*/
rust_chan::rust_chan(rust_kernel *kernel, rust_port *port,
size_t unit_sz)
: ref_count(1),
kernel(kernel),
port(port),
buffer(kernel, unit_sz) {
if (port) {
associate(port);
}
KLOG(kernel, comm, "new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
(uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
}
rust_chan::~rust_chan() {
KLOG(kernel, comm, "del rust_chan(task=0x%" PRIxPTR ")",
(uintptr_t) this);
A(kernel, is_associated() == false,
"Channel must be disassociated before being freed.");
}
/**
* Link this channel with the specified port.
*/
void rust_chan::associate(rust_port *port) {
this->port = port;
scoped_lock with(port->lock);
KLOG(kernel, task,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
++this->ref_count;
this->task = port->task;
this->task->ref();
this->port->chans.push(this);
}
bool rust_chan::is_associated() {
return port != NULL;
}
/**
* Unlink this channel from its associated port.
*/
void rust_chan::disassociate() {
A(kernel,
port->lock.lock_held_by_current_thread(),
"Port referent lock must be held to call rust_chan::disassociate");
A(kernel, is_associated(),
"Channel must be associated with a port.");
KLOG(kernel, task,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
this, port);
--this->ref_count;
task->deref();
this->task = NULL;
port->chans.swap_delete(this);
// Delete reference to the port.
port = NULL;
}
/**
* Attempt to send data to the associated port.
*/
void rust_chan::send(void *sptr) {
scoped_lock with(port->lock);
buffer.enqueue(sptr);
if (!is_associated()) {
W(kernel, is_associated(),
"rust_chan::transmit with no associated port.");
return;
}
A(kernel, !buffer.is_empty(),
"rust_chan::transmit with nothing to send.");
if (port->task->blocked_on(port)) {
KLOG(kernel, comm, "dequeued in rendezvous_ptr");
buffer.dequeue(port->task->rendezvous_ptr);
port->task->rendezvous_ptr = 0;
port->task->wakeup(port);
}
}
rust_chan *rust_chan::clone(rust_task *target) {
return new (target->kernel, "cloned chan")
rust_chan(kernel, port, buffer.unit_sz);
}
/**
* Cannot Yield: If the task were to unwind, the dropped ref would still
* appear to be live, causing modify-after-free errors.
*/
void rust_chan::destroy() {
A(kernel, ref_count == 0,
"Channel's ref count should be zero.");
if (is_associated()) {
// We're trying to delete a channel that another task may be
// reading from. We have two options:
//
// 1. We can flush the channel by blocking in upcall_flush_chan()
// and resuming only when the channel is flushed. The problem
// here is that we can get ourselves in a deadlock if the
// parent task tries to join us.
//
// 2. We can leave the channel in a "dormnat" state by not freeing
// it and letting the receiver task delete it for us instead.
if (buffer.is_empty() == false) {
return;
}
scoped_lock with(port->lock);
disassociate();
}
delete this;
}
//
// Local Variables:
// mode: C++
// fill-column: 78;
// indent-tabs-mode: nil
// c-basic-offset: 4
// buffer-file-coding-system: utf-8-unix
// compile-command: "make -k -C $RBUILD 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
// End:
//
|