diff options
| author | Eric Holk <eholk@mozilla.com> | 2011-08-09 16:07:49 -0700 |
|---|---|---|
| committer | Eric Holk <eholk@mozilla.com> | 2011-08-15 09:26:51 -0700 |
| commit | 39b16077bbcac64eb62f484486c703aed405ef2f (patch) | |
| tree | 883cd01d305130374bb0438475716982c1c483c2 /src | |
| parent | 04af99ecb0dee1cb3df0032f7e7ba08ffc6c5bd4 (diff) | |
| download | rust-39b16077bbcac64eb62f484486c703aed405ef2f.tar.gz rust-39b16077bbcac64eb62f484486c703aed405ef2f.zip | |
Port ID-based channels.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/comm.rs | 38 | ||||
| -rw-r--r-- | src/lib/task.rs | 8 | ||||
| -rw-r--r-- | src/rt/rust_builtin.cpp | 23 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 7 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 3 | ||||
| -rw-r--r-- | src/rt/rustrt.def.in | 3 | ||||
| -rw-r--r-- | src/test/run-pass/task-comm-10.rs | 7 | ||||
| -rw-r--r-- | src/test/run-pass/task-comm-16.rs | 3 | ||||
| -rw-r--r-- | src/test/stdtest/comm.rs | 11 |
9 files changed, 96 insertions, 7 deletions
diff --git a/src/lib/comm.rs b/src/lib/comm.rs index 798daf1db12..451c69dafd9 100644 --- a/src/lib/comm.rs +++ b/src/lib/comm.rs @@ -1,12 +1,15 @@ import sys; import ptr; import unsafe; +import task; +import task::task_id; export _chan; export _port; export mk_port; export chan_from_unsafe_ptr; +export send; native "rust" mod rustrt { type void; @@ -17,16 +20,28 @@ native "rust" mod rustrt { fn take_chan(ch : *rust_chan); fn drop_chan(ch : *rust_chan); fn chan_send(ch: *rust_chan, v : *void); + // FIXME: data should be -T, not &T, but this doesn't seem to be + // supported yet. + fn chan_id_send[~T](target_task : task_id, target_port : port_id, + data : &T); fn new_port(unit_sz : uint) -> *rust_port; fn del_port(po : *rust_port); fn drop_port(po : *rust_port); + fn get_port_id(po : *rust_port) -> port_id; } native "rust-intrinsic" mod rusti { - fn recv[T](port : *rustrt::rust_port) -> T; + fn recv[~T](port : *rustrt::rust_port) -> T; } +type port_id = int; + +type chan_t[~T] = { + task : task_id, + port : port_id +}; + resource chan_ptr(ch: *rustrt::rust_chan) { rustrt::drop_chan(ch); } @@ -36,7 +51,7 @@ resource port_ptr(po: *rustrt::rust_port) { rustrt::del_port(po); } -obj _chan[T](raw_chan : @chan_ptr) { +obj _chan[~T](raw_chan : @chan_ptr) { fn send(v : &T) { rustrt::chan_send(**raw_chan, unsafe::reinterpret_cast(ptr::addr_of(v))); @@ -49,20 +64,33 @@ obj _chan[T](raw_chan : @chan_ptr) { } } -fn chan_from_unsafe_ptr[T](ch : *u8) -> _chan[T] { +fn chan_from_unsafe_ptr[~T](ch : *u8) -> _chan[T] { _chan(@chan_ptr(unsafe::reinterpret_cast(ch))) } -obj _port[T](raw_port : @port_ptr) { +obj _port[~T](raw_port : @port_ptr) { fn mk_chan() -> _chan[T] { _chan(@chan_ptr(rustrt::new_chan(**raw_port))) } + // FIXME: rename this to chan once chan is not a keyword. + fn mk_chan2() -> chan_t[T] { + { + task: task::get_task_id(), + port: rustrt::get_port_id(**raw_port) + } + } + fn recv() -> T { ret rusti::recv(**raw_port) } } -fn mk_port[T]() -> _port[T] { +fn mk_port[~T]() -> _port[T] { _port(@port_ptr(rustrt::new_port(sys::size_of[T]()))) } + +// FIXME: make data move-mode once the snapshot is updated. +fn send[~T](ch : chan_t[T], data : &T) { + rustrt::chan_id_send(ch.task, ch.port, data); +} \ No newline at end of file diff --git a/src/lib/task.rs b/src/lib/task.rs index 1936cf20971..a4c164658a7 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -5,6 +5,7 @@ native "rust" mod rustrt { fn unsupervise(); fn pin_task(); fn unpin_task(); + fn get_task_id() -> task_id; fn clone_chan(c: *rust_chan) -> *rust_chan; type rust_chan; @@ -12,6 +13,12 @@ native "rust" mod rustrt { fn set_min_stack(stack_size: uint); } +type task_id = int; + +fn get_task_id() -> task_id { + rustrt::get_task_id() +} + /** * Hints the scheduler to yield this task for a specified ammount of time. * @@ -33,6 +40,7 @@ fn pin() { rustrt::pin_task(); } fn unpin() { rustrt::unpin_task(); } +// FIXME: remove this fn clone_chan[T](c: chan[T]) -> chan[T] { let cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c)); ret unsafe::reinterpret_cast(cloned); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 4b8af7a67b6..a7d6a902231 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -702,6 +702,11 @@ unpin_task(rust_task *task) { task->unpin(); } +extern "C" CDECL rust_task_id +get_task_id(rust_task *task) { + return task->id; +} + extern "C" CDECL rust_chan * clone_chan(rust_task *task, rust_chan *chan) { return chan->clone(task); @@ -738,6 +743,11 @@ del_port(rust_task *task, rust_port *port) { task->deref(); } +extern "C" CDECL rust_port_id +get_port_id(rust_task *task, rust_port *port) { + return port->id; +} + extern "C" CDECL rust_chan* new_chan(rust_task *task, rust_port *port) { rust_scheduler *sched = task->sched; @@ -776,6 +786,19 @@ chan_send(rust_task *task, rust_chan *chan, void *sptr) { } extern "C" CDECL void +chan_id_send(rust_task *task, type_desc *t, rust_task_id target_task_id, + rust_port_id target_port_id, void *sptr) { + // FIXME: make sure this is thread-safe + rust_task *target_task = task->kernel->get_task_by_id(target_task_id); + if(target_task) { + rust_port *port = target_task->get_port_by_id(target_port_id); + if(port) { + port->remote_chan->send(sptr); + } + } +} + +extern "C" CDECL void port_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { { scoped_lock with(port->lock); diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 448babfbdee..e0d3a347b98 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -1,6 +1,9 @@ #include "rust_internal.h" #include "rust_port.h" +extern "C" CDECL rust_chan* +new_chan(rust_task *task, rust_port *port); + rust_port::rust_port(rust_task *task, size_t unit_sz) : ref_count(1), kernel(task->kernel), task(task), unit_sz(unit_sz), writers(task), chans(task) { @@ -10,6 +13,7 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); id = task->register_port(this); + remote_chan = new_chan(task, this); } rust_port::~rust_port() { @@ -22,6 +26,9 @@ rust_port::~rust_port() { chan->disassociate(); } + remote_chan->deref(); + remote_chan = NULL; + task->release_port(id); } diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 9dac42244ef..8e7b215c154 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -5,12 +5,11 @@ class rust_port : public kernel_owned<rust_port>, public rust_cond { public: RUST_REFCOUNTED(rust_port); -private: rust_port_id id; -public: rust_kernel *kernel; rust_task *task; + rust_chan *remote_chan; size_t unit_sz; ptr_vec<rust_token> writers; ptr_vec<rust_chan> chans; diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index a2174e39168..09f4097b9c1 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -9,6 +9,7 @@ aio_serve aio_stop aio_writedata align_of +chan_id_send chan_send check_claims clone_chan @@ -25,6 +26,8 @@ debug_tydesc do_gc drop_chan drop_port +get_port_id +get_task_id get_time hack_allow_leaks ivec_copy_from_buf diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs index b1f825aa372..ef8746a1867 100644 --- a/src/test/run-pass/task-comm-10.rs +++ b/src/test/run-pass/task-comm-10.rs @@ -1,3 +1,10 @@ +// FIXME: this test is xfailed until sending strings is legal again. + +//xfail-stage0 +//xfail-stage1 +//xfail-stage2 +//xfail-stage3 + use std; import std::task; import std::comm; diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs index bd6f7075200..b974e2052e7 100644 --- a/src/test/run-pass/task-comm-16.rs +++ b/src/test/run-pass/task-comm-16.rs @@ -32,6 +32,8 @@ fn test_vec() { } fn test_str() { + // FIXME: re-enable this once strings are unique and sendable +/* let po = comm::mk_port(); let ch = po.mk_chan(); let s0: str = "test"; @@ -42,6 +44,7 @@ fn test_str() { assert (s1.(1) as u8 == 'e' as u8); assert (s1.(2) as u8 == 's' as u8); assert (s1.(3) as u8 == 't' as u8); +*/ } fn test_tag() { diff --git a/src/test/stdtest/comm.rs b/src/test/stdtest/comm.rs index b65939e2854..81c5d868b67 100644 --- a/src/test/stdtest/comm.rs +++ b/src/test/stdtest/comm.rs @@ -17,3 +17,14 @@ fn send_recv() { log_err v; assert(42 == v); } + +#[test] +fn send_recv2() { + let p = comm::mk_port[int](); + let c = p.mk_chan2(); + + comm::send(c, 42); + let v = p.recv(); + log_err v; + assert(42 == v); +} |
