about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiletest/compiletest.rs5
-rw-r--r--src/compiletest/procsrv.rs27
-rw-r--r--src/libcore/pipes.rs63
-rw-r--r--src/rt/rust_task.cpp3
-rw-r--r--src/test/bench/shootout-pfib.rs2
5 files changed, 61 insertions, 39 deletions
diff --git a/src/compiletest/compiletest.rs b/src/compiletest/compiletest.rs
index 39c4f0f81d0..33b9655aeb2 100644
--- a/src/compiletest/compiletest.rs
+++ b/src/compiletest/compiletest.rs
@@ -8,11 +8,6 @@ import task;
 import core::result;
 import result::{ok, err};
 
-import comm::port;
-import comm::chan;
-import comm::send;
-import comm::recv;
-
 import common::config;
 import common::mode_run_pass;
 import common::mode_run_fail;
diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs
index bb9080becff..99b18a67e1e 100644
--- a/src/compiletest/procsrv.rs
+++ b/src/compiletest/procsrv.rs
@@ -2,6 +2,8 @@ import run::spawn_process;
 import io::{writer_util, reader_util};
 import libc::{c_int, pid_t};
 
+import pipes::chan;
+
 export run;
 
 #[cfg(target_os = "win32")]
@@ -58,29 +60,30 @@ fn run(lib_path: ~str,
 
 
     writeclose(pipe_in.out, input);
-    let p = comm::port();
-    let ch = comm::chan(p);
+    let p = pipes::port_set();
+    let ch = p.chan();
     do task::spawn_sched(task::single_threaded) {
         let errput = readclose(pipe_err.in);
-        comm::send(ch, (2, errput));
+        ch.send((2, errput));
     }
+    let ch = p.chan();
     do task::spawn_sched(task::single_threaded) {
         let output = readclose(pipe_out.in);
-        comm::send(ch, (1, output));
+        ch.send((1, output));
     }
     let status = run::waitpid(pid);
     let mut errs = ~"";
     let mut outs = ~"";
     let mut count = 2;
     while count > 0 {
-        let stream = comm::recv(p);
-        alt check stream {
-            (1, s) => {
-                outs = s;
-            }
-            (2, s) => {
-                errs = s;
-            }
+        alt p.recv() {
+          (1, s) => {
+            outs = s;
+          }
+          (2, s) => {
+            errs = s;
+          }
+          _ { fail }
         };
         count -= 1;
     };
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index cd4265ada8e..963be25b69f 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -113,7 +113,7 @@ type buffer<T: send> = {
 
 struct packet_header {
     let mut state: state;
-    let mut blocked_task: option<*rust_task>;
+    let mut blocked_task: *rust_task;
 
     // This is a reinterpret_cast of a ~buffer, that can also be cast
     // to a buffer_header if need be.
@@ -121,19 +121,21 @@ struct packet_header {
 
     new() {
         self.state = empty;
-        self.blocked_task = none;
+        self.blocked_task = ptr::null();
         self.buffer = ptr::null();
     }
 
     // Returns the old state.
     unsafe fn mark_blocked(this: *rust_task) -> state {
-        self.blocked_task = some(this);
+        rustrt::rust_task_ref(this);
+        let old_task = swap_task(self.blocked_task, this);
+        assert old_task.is_null();
         swap_state_acq(self.state, blocked)
     }
 
     unsafe fn unblock() {
-        assert self.state != blocked || self.blocked_task != none;
-        self.blocked_task = none;
+        let old_task = swap_task(self.blocked_task, ptr::null());
+        if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
         alt swap_state_acq(self.state, empty) {
           empty | blocked => (),
           terminated => self.state = terminated,
@@ -241,11 +243,25 @@ fn atomic_sub_rel(&dst: int, src: int) -> int {
 }
 
 #[doc(hidden)]
+fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
+    // It might be worth making both acquire and release versions of
+    // this.
+    unsafe {
+        reinterpret_cast(rusti::atomic_xchng(
+            *(ptr::mut_addr_of(dst) as *mut int),
+            src as int))
+    }
+}
+
+#[doc(hidden)]
 type rust_task = libc::c_void;
 
 extern mod rustrt {
     #[rust_stack]
     fn rust_get_task() -> *rust_task;
+    #[rust_stack]
+    fn rust_task_ref(task: *rust_task);
+    fn rust_task_deref(task: *rust_task);
 
     #[rust_stack]
     fn task_clear_event_reject(task: *rust_task);
@@ -334,10 +350,11 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
       full => fail ~"duplicate send",
       blocked => {
         debug!{"waking up task for %?", p_};
-        alt p.header.blocked_task {
-          some(task) => rustrt::task_signal_event(
-              task, ptr::addr_of(p.header) as *libc::c_void),
-          none => debug!{"just kidding!"}
+        let old_task = swap_task(p.header.blocked_task, ptr::null());
+        if !old_task.is_null() {
+            rustrt::task_signal_event(
+                old_task, ptr::addr_of(p.header) as *libc::c_void);
+            rustrt::rust_task_deref(old_task);
         }
 
         // The receiver will eventually clean this up.
@@ -372,7 +389,9 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
     let p = unsafe { &*p_ };
     let this = rustrt::rust_get_task();
     rustrt::task_clear_event_reject(this);
-    p.header.blocked_task = some(this);
+    rustrt::rust_task_ref(this);
+    let old_task = swap_task(p.header.blocked_task, this);
+    assert old_task.is_null();
     let mut first = true;
     let mut count = SPIN_COUNT;
     loop {
@@ -402,7 +421,10 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
           full => {
             let mut payload = none;
             payload <-> p.payload;
-            p.header.blocked_task = none;
+            let old_task = swap_task(p.header.blocked_task, ptr::null());
+            if !old_task.is_null() {
+                rustrt::rust_task_deref(old_task);
+            }
             p.header.state = empty;
             return some(option::unwrap(payload))
           }
@@ -410,6 +432,11 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
             // This assert detects when we've accidentally unsafely
             // casted too big of a number to a state.
             assert old_state == terminated;
+
+            let old_task = swap_task(p.header.blocked_task, ptr::null());
+            if !old_task.is_null() {
+                rustrt::rust_task_deref(old_task);
+            }
             return none;
           }
         }
@@ -437,17 +464,18 @@ fn sender_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { &*p };
     alt swap_state_rel(p.header.state, terminated) {
       empty => {
+        assert p.header.blocked_task.is_null();
         // The receiver will eventually clean up.
         //unsafe { forget(p) }
       }
       blocked => {
         // wake up the target
-        alt p.header.blocked_task {
-          some(target) =>
+        let old_task = swap_task(p.header.blocked_task, ptr::null());
+        if !old_task.is_null() {
             rustrt::task_signal_event(
-                target,
-                ptr::addr_of(p.header) as *libc::c_void),
-          none => { debug!{"receiver is already shutting down"} }
+                old_task,
+                ptr::addr_of(p.header) as *libc::c_void);
+            rustrt::rust_task_deref(old_task);
         }
         // The receiver will eventually clean up.
         //unsafe { forget(p) }
@@ -457,6 +485,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
         fail ~"you dun goofed"
       }
       terminated => {
+        assert p.header.blocked_task.is_null();
         // I have to clean up, use drop_glue
       }
     }
@@ -465,7 +494,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
 #[doc(hidden)]
 fn receiver_terminate<T: send>(p: *packet<T>) {
     let p = unsafe { &*p };
-    assert p.header.blocked_task == none;
+    assert p.header.blocked_task.is_null();
     alt swap_state_rel(p.header.state, terminated) {
       empty => {
         // the sender will clean up
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index 58a0e3eae6c..061e87ebff8 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -678,9 +678,6 @@ MUST_CHECK bool rust_task::wait_event(void **result) {
 
 void
 rust_task::signal_event(void *event) {
-    assert(task_state_blocked == state ||
-           task_state_running == state);
-
     scoped_lock with(lifecycle_lock);
 
     this->event = event;
diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs
index 9db40205742..5b309659ee5 100644
--- a/src/test/bench/shootout-pfib.rs
+++ b/src/test/bench/shootout-pfib.rs
@@ -1,8 +1,6 @@
 // -*- rust -*-
 // xfail-pretty
 
-// xfail-test
-
 /*
   A parallel version of fibonacci numbers.