about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorEric Holk <eholk@mozilla.com>2011-08-09 16:07:49 -0700
committerEric Holk <eholk@mozilla.com>2011-08-15 09:26:51 -0700
commit39b16077bbcac64eb62f484486c703aed405ef2f (patch)
tree883cd01d305130374bb0438475716982c1c483c2 /src
parent04af99ecb0dee1cb3df0032f7e7ba08ffc6c5bd4 (diff)
downloadrust-39b16077bbcac64eb62f484486c703aed405ef2f.tar.gz
rust-39b16077bbcac64eb62f484486c703aed405ef2f.zip
Port ID-based channels.
Diffstat (limited to 'src')
-rw-r--r--src/lib/comm.rs38
-rw-r--r--src/lib/task.rs8
-rw-r--r--src/rt/rust_builtin.cpp23
-rw-r--r--src/rt/rust_port.cpp7
-rw-r--r--src/rt/rust_port.h3
-rw-r--r--src/rt/rustrt.def.in3
-rw-r--r--src/test/run-pass/task-comm-10.rs7
-rw-r--r--src/test/run-pass/task-comm-16.rs3
-rw-r--r--src/test/stdtest/comm.rs11
9 files changed, 96 insertions, 7 deletions
diff --git a/src/lib/comm.rs b/src/lib/comm.rs
index 798daf1db12..451c69dafd9 100644
--- a/src/lib/comm.rs
+++ b/src/lib/comm.rs
@@ -1,12 +1,15 @@
 import sys;
 import ptr;
 import unsafe;
+import task;
+import task::task_id;
 
 export _chan;
 export _port;
 
 export mk_port;
 export chan_from_unsafe_ptr;
+export send;
 
 native "rust" mod rustrt {
     type void;
@@ -17,16 +20,28 @@ native "rust" mod rustrt {
     fn take_chan(ch : *rust_chan);
     fn drop_chan(ch : *rust_chan);
     fn chan_send(ch: *rust_chan, v : *void);
+    // FIXME: data should be -T, not &T, but this doesn't seem to be
+    // supported yet.
+    fn chan_id_send[~T](target_task : task_id, target_port : port_id,
+                        data : &T);
 
     fn new_port(unit_sz : uint) -> *rust_port;
     fn del_port(po : *rust_port);
     fn drop_port(po : *rust_port);
+    fn get_port_id(po : *rust_port) -> port_id;
 }
 
 native "rust-intrinsic" mod rusti {
-    fn recv[T](port : *rustrt::rust_port) -> T;
+    fn recv[~T](port : *rustrt::rust_port) -> T;
 }
 
+type port_id = int;
+
+type chan_t[~T] = {
+    task : task_id,
+    port : port_id
+};
+
 resource chan_ptr(ch: *rustrt::rust_chan) {
     rustrt::drop_chan(ch);
 }
@@ -36,7 +51,7 @@ resource port_ptr(po: *rustrt::rust_port) {
     rustrt::del_port(po);
 }
 
-obj _chan[T](raw_chan : @chan_ptr) {
+obj _chan[~T](raw_chan : @chan_ptr) {
     fn send(v : &T) {
         rustrt::chan_send(**raw_chan,
                           unsafe::reinterpret_cast(ptr::addr_of(v)));
@@ -49,20 +64,33 @@ obj _chan[T](raw_chan : @chan_ptr) {
     }
 }
 
-fn chan_from_unsafe_ptr[T](ch : *u8) -> _chan[T] {
+fn chan_from_unsafe_ptr[~T](ch : *u8) -> _chan[T] {
     _chan(@chan_ptr(unsafe::reinterpret_cast(ch)))
 }
 
-obj _port[T](raw_port : @port_ptr) {
+obj _port[~T](raw_port : @port_ptr) {
     fn mk_chan() -> _chan[T] {
         _chan(@chan_ptr(rustrt::new_chan(**raw_port)))
     }
 
+    // FIXME: rename this to chan once chan is not a keyword.
+    fn mk_chan2() -> chan_t[T] {
+        {
+            task: task::get_task_id(),
+            port: rustrt::get_port_id(**raw_port)
+        }
+    }
+
     fn recv() -> T {
         ret rusti::recv(**raw_port)
     }
 }
 
-fn mk_port[T]() -> _port[T] {
+fn mk_port[~T]() -> _port[T] {
     _port(@port_ptr(rustrt::new_port(sys::size_of[T]())))
 }
+
+// FIXME: make data move-mode once the snapshot is updated.
+fn send[~T](ch : chan_t[T], data : &T) {
+    rustrt::chan_id_send(ch.task, ch.port, data);
+}
\ No newline at end of file
diff --git a/src/lib/task.rs b/src/lib/task.rs
index 1936cf20971..a4c164658a7 100644
--- a/src/lib/task.rs
+++ b/src/lib/task.rs
@@ -5,6 +5,7 @@ native "rust" mod rustrt {
     fn unsupervise();
     fn pin_task();
     fn unpin_task();
+    fn get_task_id() -> task_id;
     fn clone_chan(c: *rust_chan) -> *rust_chan;
 
     type rust_chan;
@@ -12,6 +13,12 @@ native "rust" mod rustrt {
     fn set_min_stack(stack_size: uint);
 }
 
+type task_id = int;
+
+fn get_task_id() -> task_id {
+    rustrt::get_task_id()
+}
+
 /**
  * Hints the scheduler to yield this task for a specified ammount of time.
  *
@@ -33,6 +40,7 @@ fn pin() { rustrt::pin_task(); }
 
 fn unpin() { rustrt::unpin_task(); }
 
+// FIXME: remove this
 fn clone_chan[T](c: chan[T]) -> chan[T] {
     let cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c));
     ret unsafe::reinterpret_cast(cloned);
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 4b8af7a67b6..a7d6a902231 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -702,6 +702,11 @@ unpin_task(rust_task *task) {
     task->unpin();
 }
 
+extern "C" CDECL rust_task_id
+get_task_id(rust_task *task) {
+    return task->id;
+}
+
 extern "C" CDECL rust_chan *
 clone_chan(rust_task *task, rust_chan *chan) {
     return chan->clone(task);
@@ -738,6 +743,11 @@ del_port(rust_task *task, rust_port *port) {
     task->deref();
 }
 
+extern "C" CDECL rust_port_id
+get_port_id(rust_task *task, rust_port *port) {
+    return port->id;
+}
+
 extern "C" CDECL rust_chan*
 new_chan(rust_task *task, rust_port *port) {
     rust_scheduler *sched = task->sched;
@@ -776,6 +786,19 @@ chan_send(rust_task *task, rust_chan *chan, void *sptr) {
 }
 
 extern "C" CDECL void
+chan_id_send(rust_task *task, type_desc *t, rust_task_id target_task_id,
+             rust_port_id target_port_id, void *sptr) {
+    // FIXME: make sure this is thread-safe
+    rust_task *target_task = task->kernel->get_task_by_id(target_task_id);
+    if(target_task) {
+        rust_port *port = target_task->get_port_by_id(target_port_id);
+        if(port) {
+            port->remote_chan->send(sptr);
+        }
+    }
+}
+
+extern "C" CDECL void
 port_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
     {
         scoped_lock with(port->lock);
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index 448babfbdee..e0d3a347b98 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -1,6 +1,9 @@
 #include "rust_internal.h"
 #include "rust_port.h"
 
+extern "C" CDECL rust_chan*
+new_chan(rust_task *task, rust_port *port);
+
 rust_port::rust_port(rust_task *task, size_t unit_sz)
     : ref_count(1), kernel(task->kernel), task(task),
       unit_sz(unit_sz), writers(task), chans(task) {
@@ -10,6 +13,7 @@ rust_port::rust_port(rust_task *task, size_t unit_sz)
         PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
 
     id = task->register_port(this);
+    remote_chan = new_chan(task, this);
 }
 
 rust_port::~rust_port() {
@@ -22,6 +26,9 @@ rust_port::~rust_port() {
         chan->disassociate();
     }
 
+    remote_chan->deref();
+    remote_chan = NULL;
+
     task->release_port(id);
 }
 
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 9dac42244ef..8e7b215c154 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -5,12 +5,11 @@ class rust_port : public kernel_owned<rust_port>, public rust_cond {
 public:
     RUST_REFCOUNTED(rust_port);
 
-private:
     rust_port_id id;
 
-public:
     rust_kernel *kernel;
     rust_task *task;
+    rust_chan *remote_chan;
     size_t unit_sz;
     ptr_vec<rust_token> writers;
     ptr_vec<rust_chan> chans;
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index a2174e39168..09f4097b9c1 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -9,6 +9,7 @@ aio_serve
 aio_stop
 aio_writedata
 align_of
+chan_id_send
 chan_send
 check_claims
 clone_chan
@@ -25,6 +26,8 @@ debug_tydesc
 do_gc
 drop_chan
 drop_port
+get_port_id
+get_task_id
 get_time
 hack_allow_leaks
 ivec_copy_from_buf
diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs
index b1f825aa372..ef8746a1867 100644
--- a/src/test/run-pass/task-comm-10.rs
+++ b/src/test/run-pass/task-comm-10.rs
@@ -1,3 +1,10 @@
+// FIXME: this test is xfailed until sending strings is legal again.
+
+//xfail-stage0
+//xfail-stage1
+//xfail-stage2
+//xfail-stage3
+
 use std;
 import std::task;
 import std::comm;
diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs
index bd6f7075200..b974e2052e7 100644
--- a/src/test/run-pass/task-comm-16.rs
+++ b/src/test/run-pass/task-comm-16.rs
@@ -32,6 +32,8 @@ fn test_vec() {
 }
 
 fn test_str() {
+    // FIXME: re-enable this once strings are unique and sendable
+/*
     let po = comm::mk_port();
     let ch = po.mk_chan();
     let s0: str = "test";
@@ -42,6 +44,7 @@ fn test_str() {
     assert (s1.(1) as u8 == 'e' as u8);
     assert (s1.(2) as u8 == 's' as u8);
     assert (s1.(3) as u8 == 't' as u8);
+*/
 }
 
 fn test_tag() {
diff --git a/src/test/stdtest/comm.rs b/src/test/stdtest/comm.rs
index b65939e2854..81c5d868b67 100644
--- a/src/test/stdtest/comm.rs
+++ b/src/test/stdtest/comm.rs
@@ -17,3 +17,14 @@ fn send_recv() {
     log_err v;
     assert(42 == v);
 }
+
+#[test]
+fn send_recv2() {
+    let p = comm::mk_port[int]();
+    let c = p.mk_chan2();
+
+    comm::send(c, 42);
+    let v = p.recv();
+    log_err v;
+    assert(42 == v);
+}