about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Holk <eric.holk@gmail.com>2012-06-29 18:15:28 -0700
committerEric Holk <eric.holk@gmail.com>2012-07-06 10:42:39 -0700
commit67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b (patch)
tree302572d9e576d9aa0d3fc08f70e4ee3ce20d7c40
parent5c3889a02f107a3c93e05e8834673f924113c161 (diff)
downloadrust-67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b.tar.gz
rust-67b0760592e1cf9aad2e84f1534ef08c3c5f1a2b.zip
Moved pipes runtime support to libcore, and add a test that will help verify that busy waiting is no longer happening.
Fixing the result of a bad merge.
-rw-r--r--src/libcore/core.rc6
-rw-r--r--src/libcore/pipes.rs207
-rw-r--r--src/libcore/vec.rs1
-rw-r--r--src/test/bench/msgsend-ring-contracts.rs191
-rw-r--r--src/test/run-pass/pipe-sleep.rs59
5 files changed, 270 insertions, 194 deletions
diff --git a/src/libcore/core.rc b/src/libcore/core.rc
index d4e91800660..97eefd708f3 100644
--- a/src/libcore/core.rc
+++ b/src/libcore/core.rc
@@ -39,7 +39,7 @@ export float, f32, f64;
 export box, char, str, ptr, vec, bool;
 export either, option, result, iter;
 export libc, os, io, run, rand, sys, unsafe, logging;
-export arc, newcomm, comm, task, future;
+export arc, newcomm, comm, task, future, pipes;
 export extfmt;
 export tuple;
 export to_str, to_bytes;
@@ -187,7 +187,9 @@ mod newcomm;
 mod comm;
 mod task;
 mod future;
-
+// TODO: remove the conditionals once a new snapshot happens
+#[cfg(stage1)]
+mod pipes;
 
 // Runtime and language-primitive support
 
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
new file mode 100644
index 00000000000..75a4b90af06
--- /dev/null
+++ b/src/libcore/pipes.rs
@@ -0,0 +1,207 @@
+// Runtime support for pipes.
+
+import unsafe::{forget, reinterpret_cast};
+
+enum state {
+    empty,
+    full,
+    blocked,
+    terminated
+}
+
+type packet<T: send> = {
+    mut state: state,
+    mut blocked_task: option<task::task>,
+    mut payload: option<T>
+};
+
+fn packet<T: send>() -> *packet<T> unsafe {
+    let p: *packet<T> = unsafe::transmute(~{
+        mut state: empty,
+        mut blocked_task: none::<task::task>,
+        mut payload: none::<T>
+    });
+    p
+}
+
+#[abi = "rust-intrinsic"]
+native mod rusti {
+    fn atomic_xchng(&dst: int, src: int) -> int;
+    fn atomic_xchng_acq(&dst: int, src: int) -> int;
+    fn atomic_xchng_rel(&dst: int, src: int) -> int;
+}
+
+// We should consider moving this to core::unsafe, although I
+// suspect graydon would want us to use void pointers instead.
+unsafe fn uniquify<T>(x: *T) -> ~T {
+    unsafe { unsafe::reinterpret_cast(x) }
+}
+
+fn swap_state_acq(&dst: state, src: state) -> state {
+    unsafe {
+        reinterpret_cast(rusti::atomic_xchng_acq(
+            *(ptr::mut_addr_of(dst) as *mut int),
+            src as int))
+    }
+}
+
+fn swap_state_rel(&dst: state, src: state) -> state {
+    unsafe {
+        reinterpret_cast(rusti::atomic_xchng_rel(
+            *(ptr::mut_addr_of(dst) as *mut int),
+            src as int))
+    }
+}
+
+fn send<T: send>(-p: send_packet<T>, -payload: T) {
+    let p = p.unwrap();
+    let p = unsafe { uniquify(p) };
+    assert (*p).payload == none;
+    (*p).payload <- some(payload);
+    let old_state = swap_state_rel((*p).state, full);
+    alt old_state {
+      empty {
+        // Yay, fastpath.
+
+        // The receiver will eventually clean this up.
+        unsafe { forget(p); }
+      }
+      full { fail "duplicate send" }
+      blocked {
+        // TODO: once the target will actually block, tell the
+        // scheduler to wake it up.
+
+        // The receiver will eventually clean this up.
+        unsafe { forget(p); }
+      }
+      terminated {
+        // The receiver will never receive this. Rely on drop_glue
+        // to clean everything up.
+      }
+    }
+}
+
+fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
+    let p = p.unwrap();
+    let p = unsafe { uniquify(p) };
+    loop {
+        let old_state = swap_state_acq((*p).state,
+                                       blocked);
+        alt old_state {
+          empty | blocked { task::yield(); }
+          full {
+            let mut payload = none;
+            payload <-> (*p).payload;
+            ret some(option::unwrap(payload))
+          }
+          terminated {
+            assert old_state == terminated;
+            ret none;
+          }
+        }
+    }
+}
+
+fn sender_terminate<T: send>(p: *packet<T>) {
+    let p = unsafe { uniquify(p) };
+    alt swap_state_rel((*p).state, terminated) {
+      empty | blocked {
+        // The receiver will eventually clean up.
+        unsafe { forget(p) }
+      }
+      full {
+        // This is impossible
+        fail "you dun goofed"
+      }
+      terminated {
+        // I have to clean up, use drop_glue
+      }
+    }
+}
+
+fn receiver_terminate<T: send>(p: *packet<T>) {
+    let p = unsafe { uniquify(p) };
+    alt swap_state_rel((*p).state, terminated) {
+      empty {
+        // the sender will clean up
+        unsafe { forget(p) }
+      }
+      blocked {
+        // this shouldn't happen.
+        fail "terminating a blocked packet"
+      }
+      terminated | full {
+        // I have to clean up, use drop_glue
+      }
+    }
+}
+
+class send_packet<T: send> {
+    let mut p: option<*packet<T>>;
+    new(p: *packet<T>) {
+        //#error("take send %?", p);
+        self.p = some(p);
+    }
+    drop {
+        //if self.p != none {
+        //    #error("drop send %?", option::get(self.p));
+        //}
+        if self.p != none {
+            let mut p = none;
+            p <-> self.p;
+            sender_terminate(option::unwrap(p))
+        }
+    }
+    fn unwrap() -> *packet<T> {
+        let mut p = none;
+        p <-> self.p;
+        option::unwrap(p)
+    }
+}
+
+class recv_packet<T: send> {
+    let mut p: option<*packet<T>>;
+    new(p: *packet<T>) {
+        //#error("take recv %?", p);
+        self.p = some(p);
+    }
+    drop {
+        //if self.p != none {
+        //    #error("drop recv %?", option::get(self.p));
+        //}
+        if self.p != none {
+            let mut p = none;
+            p <-> self.p;
+            receiver_terminate(option::unwrap(p))
+        }
+    }
+    fn unwrap() -> *packet<T> {
+        let mut p = none;
+        p <-> self.p;
+        option::unwrap(p)
+    }
+}
+
+fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
+    let p = packet();
+    (send_packet(p), recv_packet(p))
+}
+
+fn spawn_service<T: send>(
+    init: native fn() -> (send_packet<T>, recv_packet<T>),
+    +service: fn~(+recv_packet<T>))
+    -> send_packet<T>
+{
+    let (client, server) = init();
+
+    // This is some nasty gymnastics required to safely move the pipe
+    // into a new task.
+    let server = ~mut some(server);
+    task::spawn() {|move service|
+        let mut server_ = none;
+        server_ <-> *server;
+        service(option::unwrap(server_))
+    }
+
+    client
+}
diff --git a/src/libcore/vec.rs b/src/libcore/vec.rs
index 035be3d5d5d..b3b9d089fea 100644
--- a/src/libcore/vec.rs
+++ b/src/libcore/vec.rs
@@ -1163,7 +1163,6 @@ pure fn unpack_mut_slice<T,U>(s: &[mut T],
 impl extensions<T: copy> for ~[T] {
     #[inline(always)]
     pure fn +(rhs: &[const T]) -> ~[T] {
-he pretty printer is unhappy.
         append(self, rhs)
     }
 }
diff --git a/src/test/bench/msgsend-ring-contracts.rs b/src/test/bench/msgsend-ring-contracts.rs
index 2802fec90e1..b3e69e5699d 100644
--- a/src/test/bench/msgsend-ring-contracts.rs
+++ b/src/test/bench/msgsend-ring-contracts.rs
@@ -15,197 +15,6 @@ import std::time;
 
 import ring::server::recv;
 
-mod pipes {
-    // Runtime support for pipes.
-
-    import unsafe::{forget, reinterpret_cast};
-
-    enum state {
-        empty,
-        full,
-        blocked,
-        terminated
-    }
-
-    type packet<T: send> = {
-        mut state: state,
-        mut blocked_task: option<task::task>,
-        mut payload: option<T>
-    };
-
-    fn packet<T: send>() -> *packet<T> unsafe {
-        let p: *packet<T> = unsafe::transmute(~{
-            mut state: empty,
-            mut blocked_task: none::<task::task>,
-            mut payload: none::<T>
-        });
-        p
-    }
-
-    #[abi = "rust-intrinsic"]
-    native mod rusti {
-        fn atomic_xchng(&dst: int, src: int) -> int;
-        fn atomic_xchng_acq(&dst: int, src: int) -> int;
-        fn atomic_xchng_rel(&dst: int, src: int) -> int;
-    }
-
-    // We should consider moving this to core::unsafe, although I
-    // suspect graydon would want us to use void pointers instead.
-    unsafe fn uniquify<T>(x: *T) -> ~T {
-        unsafe { unsafe::reinterpret_cast(x) }
-    }
-
-    fn swap_state_acq(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_acq(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn swap_state_rel(&dst: state, src: state) -> state {
-        unsafe {
-            reinterpret_cast(rusti::atomic_xchng_rel(
-                *(ptr::mut_addr_of(dst) as *mut int),
-                src as int))
-        }
-    }
-
-    fn send<T: send>(-p: send_packet<T>, -payload: T) {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        assert (*p).payload == none;
-        (*p).payload <- some(payload);
-        let old_state = swap_state_rel((*p).state, full);
-        alt old_state {
-          empty {
-            // Yay, fastpath.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          full { fail "duplicate send" }
-          blocked {
-            // FIXME: once the target will actually block, tell the
-            // scheduler to wake it up.
-
-            // The receiver will eventually clean this up.
-            unsafe { forget(p); }
-          }
-          terminated {
-            // The receiver will never receive this. Rely on drop_glue
-            // to clean everything up.
-          }
-        }
-    }
-
-    fn recv<T: send>(-p: recv_packet<T>) -> option<T> {
-        let p = p.unwrap();
-        let p = unsafe { uniquify(p) };
-        loop {
-            let old_state = swap_state_acq((*p).state,
-                                           blocked);
-            alt old_state {
-              empty | blocked { task::yield(); }
-              full {
-                let mut payload = none;
-                payload <-> (*p).payload;
-                ret some(option::unwrap(payload))
-              }
-              terminated {
-                assert old_state == terminated;
-                ret none;
-              }
-            }
-        }
-    }
-
-    fn sender_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty | blocked {
-            // The receiver will eventually clean up.
-            unsafe { forget(p) }
-          }
-          full {
-            // This is impossible
-            fail "you dun goofed"
-          }
-          terminated {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    fn receiver_terminate<T: send>(p: *packet<T>) {
-        let p = unsafe { uniquify(p) };
-        alt swap_state_rel((*p).state, terminated) {
-          empty {
-            // the sender will clean up
-            unsafe { forget(p) }
-          }
-          blocked {
-            // this shouldn't happen.
-            fail "terminating a blocked packet"
-          }
-          terminated | full {
-            // I have to clean up, use drop_glue
-          }
-        }
-    }
-
-    class send_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) {
-            //#error("take send %?", p);
-            self.p = some(p);
-        }
-        drop {
-            //if self.p != none {
-            //    #error("drop send %?", option::get(self.p));
-            //}
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                sender_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    class recv_packet<T: send> {
-        let mut p: option<*packet<T>>;
-        new(p: *packet<T>) {
-            //#error("take recv %?", p);
-            self.p = some(p);
-        }
-        drop {
-            //if self.p != none {
-            //    #error("drop recv %?", option::get(self.p));
-            //}
-            if self.p != none {
-                let mut p = none;
-                p <-> self.p;
-                receiver_terminate(option::unwrap(p))
-            }
-        }
-        fn unwrap() -> *packet<T> {
-            let mut p = none;
-            p <-> self.p;
-            option::unwrap(p)
-        }
-    }
-
-    fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
-        let p = packet();
-        (send_packet(p), recv_packet(p))
-    }
-}
-
 // This module was generated by the pipe compiler.
 mod ring {
     fn init() -> (client::num, server::num) { pipes::entangle() }
diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs
new file mode 100644
index 00000000000..0855acef51d
--- /dev/null
+++ b/src/test/run-pass/pipe-sleep.rs
@@ -0,0 +1,59 @@
+use std;
+import std::timer::sleep;
+import std::uv;
+
+// Compiled by pipec
+mod oneshot {
+    fn init() -> (client::waiting, server::waiting) { pipes::entangle() }
+    enum waiting { signal(server::signaled), }
+    enum signaled { }
+    mod client {
+        fn signal(-pipe: waiting) -> signaled {
+            let (c, s) = pipes::entangle();
+            let message = oneshot::signal(s);
+            pipes::send(pipe, message);
+            c
+        }
+        type waiting = pipes::send_packet<oneshot::waiting>;
+        type signaled = pipes::send_packet<oneshot::signaled>;
+    }
+    mod server {
+        impl recv for waiting {
+            fn recv() -> extern fn(-waiting) -> oneshot::waiting {
+                fn recv(-pipe: waiting) -> oneshot::waiting {
+                    option::unwrap(pipes::recv(pipe))
+                }
+                recv
+            }
+        }
+        type waiting = pipes::recv_packet<oneshot::waiting>;
+        impl recv for signaled {
+            fn recv() -> extern fn(-signaled) -> oneshot::signaled {
+                fn recv(-pipe: signaled) -> oneshot::signaled {
+                    option::unwrap(pipes::recv(pipe))
+                }
+                recv
+            }
+        }
+        type signaled = pipes::recv_packet<oneshot::signaled>;
+    }
+}
+
+fn main() {
+    import oneshot::client::*;
+    import oneshot::server::recv;
+
+    #macro[
+        [#recv[chan],
+         chan.recv()(chan)]
+    ];
+
+    let c = pipes::spawn_service(oneshot::init) {|p|
+        #recv(p);
+    };
+
+    let iotask = uv::global_loop::get();
+    sleep(iotask, 5000);
+    
+    signal(c);
+}
\ No newline at end of file