diff options
| author | Eric Holk <eric.holk@gmail.com> | 2012-07-10 10:58:44 -0700 |
|---|---|---|
| committer | Eric Holk <eric.holk@gmail.com> | 2012-07-10 22:00:47 -0700 |
| commit | 26e6eb3d14d7ff3bcbfa5ca442a6928776982e98 (patch) | |
| tree | 81a017f6b8e77536ec47fb0b9045c34979edeec9 /src/libcore | |
| parent | d07e537fc3daaf73b7baf652e13ee2d36706258e (diff) | |
| download | rust-26e6eb3d14d7ff3bcbfa5ca442a6928776982e98.tar.gz rust-26e6eb3d14d7ff3bcbfa5ca442a6928776982e98.zip | |
Handle failure conditions correctly in pipes.
Diffstat (limited to 'src/libcore')
| -rw-r--r-- | src/libcore/pipes.rs | 33 | ||||
| -rw-r--r-- | src/libcore/task.rs | 1 |
2 files changed, 23 insertions, 11 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 441f323d7bf..7e05f048dad 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -47,7 +47,7 @@ extern mod rustrt { #[rust_stack] fn task_clear_event_reject(task: *rust_task); - fn task_wait_event(this: *rust_task) -> *libc::c_void; + fn task_wait_event(this: *rust_task, killed: &mut bool) -> *libc::c_void; fn task_signal_event(target: *rust_task, event: *libc::c_void); } @@ -57,6 +57,16 @@ unsafe fn uniquify<T>(x: *T) -> ~T { unsafe { unsafe::reinterpret_cast(x) } } +fn wait_event(this: *rust_task) -> *libc::c_void { + let mut killed = false; + + let res = rustrt::task_wait_event(this, &mut killed); + if killed && !task::failing() { + fail "killed" + } + res +} + fn swap_state_acq(&dst: state, src: state) -> state { unsafe { reinterpret_cast(rusti::atomic_xchng_acq( @@ -113,23 +123,23 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> { let this = rustrt::rust_get_task(); rustrt::task_clear_event_reject(this); p.header.blocked_task = some(this); + let mut first = true; loop { + rustrt::task_clear_event_reject(this); let old_state = swap_state_acq(p.header.state, blocked); #debug("%?", old_state); alt old_state { empty { #debug("no data available on %?, going to sleep.", p_); - rustrt::task_wait_event(this); + wait_event(this); #debug("woke up, p.state = %?", p.header.state); - if p.header.state == full { - let mut payload = none; - payload <-> (*p).payload; - p.header.state = terminated; - ret some(option::unwrap(payload)) + } + blocked { + if first { + fail "blocking on already blocked packet" } } - blocked { fail "blocking on already blocked packet" } full { let mut payload = none; payload <-> (*p).payload; @@ -141,11 +151,12 @@ fn recv<T: send>(-p: recv_packet<T>) -> option<T> { ret none; } } + first = false; } } /// Returns true if messages are available. -fn peek<T: send>(p: recv_packet<T>) -> bool { +pure fn peek<T: send>(p: recv_packet<T>) -> bool { alt p.header().state { empty { false } blocked { fail "peeking on blocked packet" } @@ -236,7 +247,7 @@ fn wait_many(pkts: ~[&a.packet_header]) -> uint { while !data_avail { #debug("sleeping on %? packets", pkts.len()); - let event = rustrt::task_wait_event(this) as *packet_header; + let event = wait_event(this) as *packet_header; let pos = vec::position(pkts, |p| ptr::addr_of(*p) == event); alt pos { @@ -356,7 +367,7 @@ class recv_packet<T: send> { option::unwrap(p) } - fn header() -> &self.packet_header { + pure fn header() -> &self.packet_header { alt self.p { some(packet) { unsafe { diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 45ed620b30e..3d05611aa98 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -46,6 +46,7 @@ export future_result; export future_task; export unsupervise; export run_listener; +export run_with; export spawn; export spawn_with; |
