diff options
| author | Eric Holk <eric.holk@gmail.com> | 2012-06-29 18:15:28 -0700 |
|---|---|---|
| committer | Eric Holk <eric.holk@gmail.com> | 2012-07-06 10:42:39 -0700 |
| commit | 67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b (patch) | |
| tree | 302572d9e576d9aa0d3fc08f70e4ee3ce20d7c40 | |
| parent | 5c3889a02f107a3c93e05e8834673f924113c161 (diff) | |
| download | rust-67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b.tar.gz rust-67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b.zip | |
Moved pipes runtime support to libcore, and add a test that will help verify that busy waiting is no longer happening.
Fixing the result of a bad merge.
| -rw-r--r-- | src/libcore/core.rc | 6 | ||||
| -rw-r--r-- | src/libcore/pipes.rs | 207 | ||||
| -rw-r--r-- | src/libcore/vec.rs | 1 | ||||
| -rw-r--r-- | src/test/bench/msgsend-ring-contracts.rs | 191 | ||||
| -rw-r--r-- | src/test/run-pass/pipe-sleep.rs | 59 |
5 files changed, 270 insertions, 194 deletions
diff --git a/src/libcore/core.rc b/src/libcore/core.rc index d4e91800660..97eefd708f3 100644 --- a/src/libcore/core.rc +++ b/src/libcore/core.rc @@ -39,7 +39,7 @@ export float, f32, f64; export box, char, str, ptr, vec, bool; export either, option, result, iter; export libc, os, io, run, rand, sys, unsafe, logging; -export arc, newcomm, comm, task, future; +export arc, newcomm, comm, task, future, pipes; export extfmt; export tuple; export to_str, to_bytes; @@ -187,7 +187,9 @@ mod newcomm; mod comm; mod task; mod future; - +// TODO: remove the conditionals once a new snapshot happens +#[cfg(stage1)] +mod pipes; // Runtime and language-primitive support diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs new file mode 100644 index 00000000000..75a4b90af06 --- /dev/null +++ b/src/libcore/pipes.rs @@ -0,0 +1,207 @@ +// Runtime support for pipes. + +import unsafe::{forget, reinterpret_cast}; + +enum state { + empty, + full, + blocked, + terminated +} + +type packet<T: send> = { + mut state: state, + mut blocked_task: option<task::task>, + mut payload: option<T> +}; + +fn packet<T: send>() -> *packet<T> unsafe { + let p: *packet<T> = unsafe::transmute(~{ + mut state: empty, + mut blocked_task: none::<task::task>, + mut payload: none::<T> + }); + p +} + +#[abi = "rust-intrinsic"] +native mod rusti { + fn atomic_xchng(&dst: int, src: int) -> int; + fn atomic_xchng_acq(&dst: int, src: int) -> int; + fn atomic_xchng_rel(&dst: int, src: int) -> int; +} + +// We should consider moving this to core::unsafe, although I +// suspect graydon would want us to use void pointers instead. +unsafe fn uniquify<T>(x: *T) -> ~T { + unsafe { unsafe::reinterpret_cast(x) } +} + +fn swap_state_acq(&dst: state, src: state) -> state { + unsafe { + reinterpret_cast(rusti::atomic_xchng_acq( + *(ptr::mut_addr_of(dst) as *mut int), + src as int)) + } +} + +fn swap_state_rel(&dst: state, src: state) -> state { + unsafe { + reinterpret_cast(rusti::atomic_xchng_rel( + *(ptr::mut_addr_of(dst) as *mut int), + src as int)) + } +} + +fn send<T: send>(-p: send_packet<T>, -payload: T) { + let p = p.unwrap(); + let p = unsafe { uniquify(p) }; + assert (*p).payload == none; + (*p).payload <- some(payload); + let old_state = swap_state_rel((*p).state, full); + alt old_state { + empty { + // Yay, fastpath. + + // The receiver will eventually clean this up. + unsafe { forget(p); } + } + full { fail "duplicate send" } + blocked { + // TODO: once the target will actually block, tell the + // scheduler to wake it up. + + // The receiver will eventually clean this up. + unsafe { forget(p); } + } + terminated { + // The receiver will never receive this. Rely on drop_glue + // to clean everything up. + } + } +} + +fn recv<T: send>(-p: recv_packet<T>) -> option<T> { + let p = p.unwrap(); + let p = unsafe { uniquify(p) }; + loop { + let old_state = swap_state_acq((*p).state, + blocked); + alt old_state { + empty | blocked { task::yield(); } + full { + let mut payload = none; + payload <-> (*p).payload; + ret some(option::unwrap(payload)) + } + terminated { + assert old_state == terminated; + ret none; + } + } + } +} + +fn sender_terminate<T: send>(p: *packet<T>) { + let p = unsafe { uniquify(p) }; + alt swap_state_rel((*p).state, terminated) { + empty | blocked { + // The receiver will eventually clean up. + unsafe { forget(p) } + } + full { + // This is impossible + fail "you dun goofed" + } + terminated { + // I have to clean up, use drop_glue + } + } +} + +fn receiver_terminate<T: send>(p: *packet<T>) { + let p = unsafe { uniquify(p) }; + alt swap_state_rel((*p).state, terminated) { + empty { + // the sender will clean up + unsafe { forget(p) } + } + blocked { + // this shouldn't happen. + fail "terminating a blocked packet" + } + terminated | full { + // I have to clean up, use drop_glue + } + } +} + +class send_packet<T: send> { + let mut p: option<*packet<T>>; + new(p: *packet<T>) { + //#error("take send %?", p); + self.p = some(p); + } + drop { + //if self.p != none { + // #error("drop send %?", option::get(self.p)); + //} + if self.p != none { + let mut p = none; + p <-> self.p; + sender_terminate(option::unwrap(p)) + } + } + fn unwrap() -> *packet<T> { + let mut p = none; + p <-> self.p; + option::unwrap(p) + } +} + +class recv_packet<T: send> { + let mut p: option<*packet<T>>; + new(p: *packet<T>) { + //#error("take recv %?", p); + self.p = some(p); + } + drop { + //if self.p != none { + // #error("drop recv %?", option::get(self.p)); + //} + if self.p != none { + let mut p = none; + p <-> self.p; + receiver_terminate(option::unwrap(p)) + } + } + fn unwrap() -> *packet<T> { + let mut p = none; + p <-> self.p; + option::unwrap(p) + } +} + +fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) { + let p = packet(); + (send_packet(p), recv_packet(p)) +} + +fn spawn_service<T: send>( + init: native fn() -> (send_packet<T>, recv_packet<T>), + +service: fn~(+recv_packet<T>)) + -> send_packet<T> +{ + let (client, server) = init(); + + // This is some nasty gymnastics required to safely move the pipe + // into a new task. + let server = ~mut some(server); + task::spawn() {|move service| + let mut server_ = none; + server_ <-> *server; + service(option::unwrap(server_)) + } + + client +} diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs index 035be3d5d5d..b3b9d089fea 100644 --- a/src/libcore/vec.rs +++ b/src/libcore/vec.rs @@ -1163,7 +1163,6 @@ pure fn unpack_mut_slice<T,U>(s: &[mut T], impl extensions<T: copy> for ~[T] { #[inline(always)] pure fn +(rhs: &[const T]) -> ~[T] { -he pretty printer is unhappy. append(self, rhs) } } diff --git a/src/test/bench/msgsend-ring-contracts.rs b/src/test/bench/msgsend-ring-contracts.rs index 2802fec90e1..b3e69e5699d 100644 --- a/src/test/bench/msgsend-ring-contracts.rs +++ b/src/test/bench/msgsend-ring-contracts.rs @@ -15,197 +15,6 @@ import std::time; import ring::server::recv; -mod pipes { - // Runtime support for pipes. - - import unsafe::{forget, reinterpret_cast}; - - enum state { - empty, - full, - blocked, - terminated - } - - type packet<T: send> = { - mut state: state, - mut blocked_task: option<task::task>, - mut payload: option<T> - }; - - fn packet<T: send>() -> *packet<T> unsafe { - let p: *packet<T> = unsafe::transmute(~{ - mut state: empty, - mut blocked_task: none::<task::task>, - mut payload: none::<T> - }); - p - } - - #[abi = "rust-intrinsic"] - native mod rusti { - fn atomic_xchng(&dst: int, src: int) -> int; - fn atomic_xchng_acq(&dst: int, src: int) -> int; - fn atomic_xchng_rel(&dst: int, src: int) -> int; - } - - // We should consider moving this to core::unsafe, although I - // suspect graydon would want us to use void pointers instead. - unsafe fn uniquify<T>(x: *T) -> ~T { - unsafe { unsafe::reinterpret_cast(x) } - } - - fn swap_state_acq(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_acq( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn swap_state_rel(&dst: state, src: state) -> state { - unsafe { - reinterpret_cast(rusti::atomic_xchng_rel( - *(ptr::mut_addr_of(dst) as *mut int), - src as int)) - } - } - - fn send<T: send>(-p: send_packet<T>, -payload: T) { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - assert (*p).payload == none; - (*p).payload <- some(payload); - let old_state = swap_state_rel((*p).state, full); - alt old_state { - empty { - // Yay, fastpath. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - full { fail "duplicate send" } - blocked { - // FIXME: once the target will actually block, tell the - // scheduler to wake it up. - - // The receiver will eventually clean this up. - unsafe { forget(p); } - } - terminated { - // The receiver will never receive this. Rely on drop_glue - // to clean everything up. - } - } - } - - fn recv<T: send>(-p: recv_packet<T>) -> option<T> { - let p = p.unwrap(); - let p = unsafe { uniquify(p) }; - loop { - let old_state = swap_state_acq((*p).state, - blocked); - alt old_state { - empty | blocked { task::yield(); } - full { - let mut payload = none; - payload <-> (*p).payload; - ret some(option::unwrap(payload)) - } - terminated { - assert old_state == terminated; - ret none; - } - } - } - } - - fn sender_terminate<T: send>(p: *packet<T>) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty | blocked { - // The receiver will eventually clean up. - unsafe { forget(p) } - } - full { - // This is impossible - fail "you dun goofed" - } - terminated { - // I have to clean up, use drop_glue - } - } - } - - fn receiver_terminate<T: send>(p: *packet<T>) { - let p = unsafe { uniquify(p) }; - alt swap_state_rel((*p).state, terminated) { - empty { - // the sender will clean up - unsafe { forget(p) } - } - blocked { - // this shouldn't happen. - fail "terminating a blocked packet" - } - terminated | full { - // I have to clean up, use drop_glue - } - } - } - - class send_packet<T: send> { - let mut p: option<*packet<T>>; - new(p: *packet<T>) { - //#error("take send %?", p); - self.p = some(p); - } - drop { - //if self.p != none { - // #error("drop send %?", option::get(self.p)); - //} - if self.p != none { - let mut p = none; - p <-> self.p; - sender_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet<T> { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - class recv_packet<T: send> { - let mut p: option<*packet<T>>; - new(p: *packet<T>) { - //#error("take recv %?", p); - self.p = some(p); - } - drop { - //if self.p != none { - // #error("drop recv %?", option::get(self.p)); - //} - if self.p != none { - let mut p = none; - p <-> self.p; - receiver_terminate(option::unwrap(p)) - } - } - fn unwrap() -> *packet<T> { - let mut p = none; - p <-> self.p; - option::unwrap(p) - } - } - - fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) { - let p = packet(); - (send_packet(p), recv_packet(p)) - } -} - // This module was generated by the pipe compiler. mod ring { fn init() -> (client::num, server::num) { pipes::entangle() } diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs new file mode 100644 index 00000000000..0855acef51d --- /dev/null +++ b/src/test/run-pass/pipe-sleep.rs @@ -0,0 +1,59 @@ +use std; +import std::timer::sleep; +import std::uv; + +// Compiled by pipec +mod oneshot { + fn init() -> (client::waiting, server::waiting) { pipes::entangle() } + enum waiting { signal(server::signaled), } + enum signaled { } + mod client { + fn signal(-pipe: waiting) -> signaled { + let (c, s) = pipes::entangle(); + let message = oneshot::signal(s); + pipes::send(pipe, message); + c + } + type waiting = pipes::send_packet<oneshot::waiting>; + type signaled = pipes::send_packet<oneshot::signaled>; + } + mod server { + impl recv for waiting { + fn recv() -> extern fn(-waiting) -> oneshot::waiting { + fn recv(-pipe: waiting) -> oneshot::waiting { + option::unwrap(pipes::recv(pipe)) + } + recv + } + } + type waiting = pipes::recv_packet<oneshot::waiting>; + impl recv for signaled { + fn recv() -> extern fn(-signaled) -> oneshot::signaled { + fn recv(-pipe: signaled) -> oneshot::signaled { + option::unwrap(pipes::recv(pipe)) + } + recv + } + } + type signaled = pipes::recv_packet<oneshot::signaled>; + } +} + +fn main() { + import oneshot::client::*; + import oneshot::server::recv; + + #macro[ + [#recv[chan], + chan.recv()(chan)] + ]; + + let c = pipes::spawn_service(oneshot::init) {|p| + #recv(p); + }; + + let iotask = uv::global_loop::get(); + sleep(iotask, 5000); + + signal(c); +} \ No newline at end of file |
