about summary refs log tree commit diff
path: root/src/libcore
diff options
context:
space:
mode:
authorEric Holk <eric.holk@gmail.com>2012-07-10 10:58:44 -0700
committerEric Holk <eric.holk@gmail.com>2012-07-10 22:00:47 -0700
commit26e6eb3d14d7ff3bcbfa5ca442a6928776982e98 (patch)
tree81a017f6b8e77536ec47fb0b9045c34979edeec9 /src/libcore
parentd07e537fc3daaf73b7baf652e13ee2d36706258e (diff)
downloadrust-26e6eb3d14d7ff3bcbfa5ca442a6928776982e98.tar.gz
rust-26e6eb3d14d7ff3bcbfa5ca442a6928776982e98.zip
Handle failure conditions correctly in pipes.
Diffstat (limited to 'src/libcore')
-rw-r--r--src/libcore/pipes.rs33
-rw-r--r--src/libcore/task.rs1
2 files changed, 23 insertions, 11 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 441f323d7bf..7e05f048dad 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -47,7 +47,7 @@ extern mod rustrt {
     #[rust_stack]
     fn task_clear_event_reject(task: *rust_task);
 
-    fn task_wait_event(this: *rust_task) -> *libc::c_void;
+    fn task_wait_event(this: *rust_task, killed: &mut bool) -> *libc::c_void;
     fn task_signal_event(target: *rust_task, event: *libc::c_void);
 }
 
@@ -57,6 +57,16 @@ unsafe fn uniquify<T>(x: *T) -> ~T {
     unsafe { unsafe::reinterpret_cast(x) }
 }
 
+fn wait_event(this: *rust_task) -> *libc::c_void {
+    let mut killed = false;
+
+    let res = rustrt::task_wait_event(this, &mut killed);
+    if killed && !task::failing() {
+        fail "killed"
+    }
+    res
+}
+
 fn swap_state_acq(&dst: state, src: state) -> state {
     unsafe {
         reinterpret_cast(rusti::atomic_xchng_acq(
@@ -113,23 +123,23 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
     let this = rustrt::rust_get_task();
     rustrt::task_clear_event_reject(this);
     p.header.blocked_task = some(this);
+    let mut first = true;
     loop {
+        rustrt::task_clear_event_reject(this);
         let old_state = swap_state_acq(p.header.state,
                                        blocked);
         #debug("%?", old_state);
         alt old_state {
           empty {
             #debug("no data available on %?, going to sleep.", p_);
-            rustrt::task_wait_event(this);
+            wait_event(this);
             #debug("woke up, p.state = %?", p.header.state);
-            if p.header.state == full {
-                let mut payload = none;
-                payload <-> (*p).payload;
-                p.header.state = terminated;
-                ret some(option::unwrap(payload))
+          }
+          blocked {
+            if first {
+                fail "blocking on already blocked packet"
             }
           }
-          blocked { fail "blocking on already blocked packet" }
           full {
             let mut payload = none;
             payload <-> (*p).payload;
@@ -141,11 +151,12 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
             ret none;
           }
         }
+        first = false;
     }
 }
 
 /// Returns true if messages are available.
-fn peek<T: send>(p: recv_packet<T>) -> bool {
+pure fn peek<T: send>(p: recv_packet<T>) -> bool {
     alt p.header().state {
       empty { false }
       blocked { fail "peeking on blocked packet" }
@@ -236,7 +247,7 @@ fn wait_many(pkts: ~[&a.packet_header]) -> uint {
 
     while !data_avail {
         #debug("sleeping on %? packets", pkts.len());
-        let event = rustrt::task_wait_event(this) as *packet_header;
+        let event = wait_event(this) as *packet_header;
         let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event);
 
         alt pos {
@@ -356,7 +367,7 @@ class recv_packet<T: send> {
         option::unwrap(p)
     }
 
-    fn header() -> &self.packet_header {
+    pure fn header() -> &self.packet_header {
         alt self.p {
           some(packet) {
             unsafe {
diff --git a/src/libcore/task.rs b/src/libcore/task.rs
index 45ed620b30e..3d05611aa98 100644
--- a/src/libcore/task.rs
+++ b/src/libcore/task.rs
@@ -46,6 +46,7 @@ export future_result;
 export future_task;
 export unsupervise;
 export run_listener;
+export run_with;
 
 export spawn;
 export spawn_with;