about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-15 17:20:48 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-17 17:54:18 -0700
commitf5987b03b8d65a2b885519b7b9a0ea33cda33bc5 (patch)
tree34f207d840cf8fb2c3e18d93178f7b8e6455db62
parent03a8e59615f7ced4def8adaad41cfcb0fd0f9d29 (diff)
downloadrust-f5987b03b8d65a2b885519b7b9a0ea33cda33bc5.tar.gz
rust-f5987b03b8d65a2b885519b7b9a0ea33cda33bc5.zip
core::rt: implement `oneshot` and `stream`.
-rw-r--r--src/libcore/rt/comm.rs599
-rw-r--r--src/libcore/rt/mod.rs3
-rw-r--r--src/libcore/rt/sched.rs11
-rw-r--r--src/libcore/rt/test.rs33
4 files changed, 645 insertions, 1 deletions
diff --git a/src/libcore/rt/comm.rs b/src/libcore/rt/comm.rs
new file mode 100644
index 00000000000..9fcb70cfc7d
--- /dev/null
+++ b/src/libcore/rt/comm.rs
@@ -0,0 +1,599 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use option::*;
+use cast;
+use util;
+use ops::Drop;
+use kinds::Owned;
+use rt::sched::Coroutine;
+use rt::local_sched;
+#[cfg(stage0)]
+use unstable::intrinsics::{atomic_xchg};
+#[cfg(not(stage0))]
+use unstable::intrinsics::{atomic_xchg, atomic_load};
+use util::Void;
+use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
+use cell::Cell;
+
+/// A combined refcount / ~Task pointer.
+///
+/// Can be equal to the following values:
+///
+/// * 2 - both endpoints are alive
+/// * 1 - either the sender or the receiver is dead, determined by context
+/// * <ptr> - A pointer to a Task that can be transmuted to ~Task
+type State = int;
+
+static STATE_BOTH: State = 2;
+static STATE_ONE: State = 1;
+
+struct Packet<T> {
+    state: State,
+    payload: Option<T>,
+}
+
+pub struct PortOne<T> {
+    // XXX: Hack extra allocation to make by-val self work
+    inner: ~PortOneHack<T>
+}
+
+pub struct ChanOne<T> {
+    // XXX: Hack extra allocation to make by-val self work
+    inner: ~ChanOneHack<T>
+}
+
+pub struct PortOneHack<T> {
+    void_packet: *mut Void,
+    suppress_finalize: bool
+}
+
+pub struct ChanOneHack<T> {
+    void_packet: *mut Void,
+    suppress_finalize: bool
+}
+
+pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+    let packet: ~Packet<T> = ~Packet {
+        state: STATE_BOTH,
+        payload: None
+    };
+
+    unsafe {
+        let packet: *mut Void = cast::transmute(packet);
+        let port = PortOne {
+            inner: ~PortOneHack {
+                void_packet: packet,
+                suppress_finalize: false
+            }
+        };
+        let chan = ChanOne {
+            inner: ~ChanOneHack {
+                void_packet: packet,
+                suppress_finalize: false
+            }
+        };
+        return (port, chan);
+    }
+}
+
+impl<T> PortOne<T> {
+    pub fn recv(self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on closed channel");
+            }
+        }
+    }
+
+    pub fn try_recv(self) -> Option<T> {
+        let mut this = self;
+
+        {
+            let self_ptr: *mut PortOne<T> = &mut this;
+
+            // XXX: Optimize this to not require the two context switches when data is available
+
+            // Switch to the scheduler
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                unsafe {
+                    let task_as_state: State = cast::transmute(task);
+                    let oldstate = atomic_xchg(&mut (*(*self_ptr).inner.packet()).state, task_as_state);
+                    match oldstate {
+                        STATE_BOTH => {
+                            // Data has not been sent. Now we're blocked.
+                        }
+                        STATE_ONE => {
+                            // Channel is closed. Switch back and check the data.
+                            let task: ~Coroutine = cast::transmute(task_as_state);
+                            let sched = local_sched::take();
+                            sched.resume_task_immediately(task);
+                        }
+                        _ => util::unreachable()
+                    }
+                }
+            }
+        }
+
+        // Task resumes.
+
+        // No further memory barrier is needed here to access the
+        // payload. Some scenarios:
+        //
+        // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine.
+        // 2) We encountered STATE_BOTH above and blocked. The sending task work-stole us
+        //    and ran on its thread. The work stealing had a memory barrier.
+        // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task)
+        //    is pinned to some other scheduler, so the sending task had to give us to
+        //    a different scheduler for resuming. That send synchronized memory.
+
+        unsafe {
+            let payload = util::replace(&mut (*this.inner.packet()).payload, None);
+
+            // The sender has closed up shop. Drop the packet.
+            let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
+            // Supress the finalizer. We're done here.
+            this.inner.suppress_finalize = true;
+
+            return payload;
+        }
+    }
+}
+
+impl<T> Peekable<T> for PortOne<T> {
+    #[cfg(stage0)]
+    fn peek(&self) -> bool { fail!() }
+
+    #[cfg(not(stage0))]
+    fn peek(&self) -> bool {
+        unsafe {
+            let packet: *mut Packet<T> = self.inner.packet();
+            let oldstate = atomic_load(&mut (*packet).state);
+            match oldstate {
+                STATE_BOTH => false,
+                STATE_ONE => (*packet).payload.is_some(),
+                _ => util::unreachable()
+            }
+        }
+    }
+}
+
+impl<T> ChanOne<T> {
+
+    pub fn send(self, val: T) {
+        self.try_send(val);
+    }
+
+    pub fn try_send(self, val: T) -> bool {
+        let mut this = self;
+        let mut recvr_active = true;
+
+        unsafe {
+            assert!((*this.inner.packet()).payload.is_none());
+            (*this.inner.packet()).payload = Some(val);
+
+            let oldstate = atomic_xchg(&mut (*this.inner.packet()).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    // Port is not recving yet. Nothing to do
+                }
+                STATE_ONE => {
+                    // Port has closed. Need to clean up.
+                    let _packet: ~Packet<T> = cast::transmute(this.inner.void_packet);
+                    recvr_active = false;
+                }
+                _ => {
+                    // Port is blocked. Wake it up.
+                    let recvr: ~Coroutine = cast::transmute(oldstate);
+                    let sched = local_sched::take();
+                    sched.schedule_task(recvr);
+                }
+            }
+        }
+
+        // Suppress the finalizer. We're done here.
+        this.inner.suppress_finalize = true;
+        return recvr_active;
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for PortOneHack<T> {
+    fn finalize(&self) {
+        if self.suppress_finalize { return }
+
+        unsafe {
+            let this = cast::transmute_mut(self);
+            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    /* cleanup is the chan's responsibility */
+                },
+                STATE_ONE => {
+                    let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+                }
+                _ => {
+                    util::unreachable()
+                }
+            }
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for ChanOneHack<T> {
+    fn finalize(&self) {
+        if self.suppress_finalize { return }
+
+        unsafe {
+            let this = cast::transmute_mut(self);
+            let oldstate = atomic_xchg(&mut (*this.packet()).state, STATE_ONE);
+            match oldstate {
+                STATE_BOTH => {
+                    /* cleanup is the port's responsibility */
+                },
+                STATE_ONE => {
+                    let _packet: ~Packet<T> = cast::transmute(this.void_packet);
+                },
+                _ => {
+                    // The port is blocked recving for a message we will never send. Wake it.
+                    assert!((*this.packet()).payload.is_none());
+                    let recvr: ~Coroutine = cast::transmute(oldstate);
+                    let sched = local_sched::take();
+                    sched.schedule_task(recvr);
+                }
+            }
+        }
+    }
+}
+
+impl<T> PortOneHack<T> {
+    fn packet(&self) -> *mut Packet<T> {
+        unsafe {
+            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
+            let p: *mut Packet<T> = &mut **p;
+            return p;
+        }
+    }
+}
+
+impl<T> ChanOneHack<T> {
+    fn packet(&self) -> *mut Packet<T> {
+        unsafe {
+            let p: *mut ~Packet<T> = cast::transmute(&self.void_packet);
+            let p: *mut Packet<T> = &mut **p;
+            return p;
+        }
+    }
+}
+
+struct StreamPayload<T>(T, PortOne<StreamPayload<T>>);
+
+pub struct Port<T> {
+    // FIXME #5372. Using Cell because we don't take &mut self
+    next: Cell<PortOne<StreamPayload<T>>>
+}
+
+pub struct Chan<T> {
+    // FIXME #5372. Using Cell because we don't take &mut self
+    next: Cell<ChanOne<StreamPayload<T>>>
+}
+
+pub fn stream<T: Owned>() -> (Port<T>, Chan<T>) {
+    let (pone, cone) = oneshot();
+    let port = Port { next: Cell(pone) };
+    let chan = Chan { next: Cell(cone) };
+    return (port, chan);
+}
+
+impl<T> GenericPort<T> for Port<T> {
+    fn recv(&self) -> T {
+        match self.try_recv() {
+            Some(val) => val,
+            None => {
+                fail!("receiving on closed channel");
+            }
+        }
+    }
+
+    fn try_recv(&self) -> Option<T> {
+        let pone = self.next.take();
+        match pone.try_recv() {
+            Some(StreamPayload(val, next)) => {
+                self.next.put_back(next);
+                Some(val)
+            }
+            None => None
+        }
+    }
+}
+
+impl<T> Peekable<T> for Port<T> {
+    fn peek(&self) -> bool {
+        self.next.with_mut_ref(|p| p.peek())
+    }
+}
+
+impl<T: Owned> GenericChan<T> for Chan<T> {
+    fn send(&self, val: T) {
+        self.try_send(val);
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+    fn try_send(&self, val: T) -> bool {
+        let (next_pone, next_cone) = oneshot();
+        let cone = self.next.take();
+        self.next.put_back(next_cone);
+        cone.try_send(StreamPayload(val, next_pone))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use option::*;
+    use rt::test::*;
+    use cell::Cell;
+    use iter::Times;
+
+    #[test]
+    fn oneshot_single_thread_close_port_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (port, _chan) = oneshot::<int>();
+            { let _p = port; }
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_close_chan_first() {
+        // Simple test of closing without sending
+        do run_in_newsched_task {
+            let (_port, chan) = oneshot::<int>();
+            { let _c = chan; }
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_send_port_close() {
+        // Testing that the sender cleans up the payload if receiver is closed
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            { let _p = port; }
+            chan.send(~0);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_recv_chan_close() {
+        // Receiving on a closed chan will fail
+        do run_in_newsched_task {
+            let res = do spawntask_try {
+                let (port, chan) = oneshot::<~int>();
+                { let _c = chan; }
+                port.recv();
+            };
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_send_then_recv() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            chan.send(~10);
+            assert!(port.recv() == ~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(chan.try_send(10));
+            assert!(port.recv() == 10);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_send_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _p = port; }
+            assert!(!chan.try_send(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            chan.send(10);
+            assert!(port.try_recv() == Some(10));
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_try_recv_closed() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _c = chan; }
+            assert!(port.try_recv() == None);
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_data() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(!port.peek());
+            chan.send(10);
+            assert!(port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_close() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            { let _c = chan; }
+            assert!(!port.peek());
+            assert!(!port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_single_thread_peek_open() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<int>();
+            assert!(!port.peek());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_send() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            let port_cell = Cell(port);
+            do spawntask_immediately {
+                assert!(port_cell.take().recv() == ~10);
+            }
+
+            chan.send(~10);
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_task_recv_then_close() {
+        do run_in_newsched_task {
+            let (port, chan) = oneshot::<~int>();
+            let port_cell = Cell(port);
+            let chan_cell = Cell(chan);
+            do spawntask_later {
+                let _cell = chan_cell.take();
+            }
+            let res = do spawntask_try {
+                assert!(port_cell.take().recv() == ~10);
+            };
+            assert!(res.is_err());
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let port_cell = Cell(port);
+                let _thread = do spawntask_thread {
+                    let _p = port_cell.take();
+                };
+                let _chan = chan;
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    let _p = port_cell.take();
+                };
+                let _thread2 = do spawntask_thread {
+                    let c = chan_cell.take();
+                    c.send(1);
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_recv_close_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    let port_cell = Cell(port_cell.take());
+                    let res = do spawntask_try {
+                        port_cell.take().recv();
+                    };
+                    assert!(res.is_err());
+                };
+                let _thread2 = do spawntask_thread {
+                    let chan_cell = Cell(chan_cell.take());
+                    do spawntask {
+                        chan_cell.take();
+                    }
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn oneshot_multi_thread_send_recv_stress() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = oneshot::<~int>();
+                let chan_cell = Cell(chan);
+                let port_cell = Cell(port);
+                let _thread1 = do spawntask_thread {
+                    chan_cell.take().send(~10);
+                };
+                let _thread2 = do spawntask_thread {
+                    assert!(port_cell.take().recv() == ~10);
+                };
+            }
+        }
+    }
+
+    #[test]
+    fn stream_send_recv() {
+        for stress_factor().times {
+            do run_in_newsched_task {
+                let (port, chan) = stream::<~int>();
+
+                send(chan, 0);
+                recv(port, 0);
+
+                fn send(chan: Chan<~int>, i: int) {
+                    if i == 10 { return }
+
+                    let chan_cell = Cell(chan);
+                    let _thread = do spawntask_thread {
+                        let chan = chan_cell.take();
+                        chan.send(~i);
+                        send(chan, i + 1);
+                    };
+                }
+
+                fn recv(port: Port<~int>, i: int) {
+                    if i == 10 { return }
+
+                    let port_cell = Cell(port);
+                    let _thread = do spawntask_thread {
+                        let port = port_cell.take();
+                        assert!(port.recv() == ~i);
+                        recv(port, i + 1);
+                    };
+                }
+            }
+        }
+    }
+}
+
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index 7a772ff0f3b..dab627188d0 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -122,6 +122,9 @@ pub mod rc;
 /// scheduler and task context
 pub mod tube;
 
+/// Simple reimplementation of core::comm
+pub mod comm;
+
 /// Set up a default runtime configuration, given compiler-supplied arguments.
 ///
 /// This is invoked by the `start` _language item_ (unstable::lang) to
diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs
index 5c1a3410087..c66f20e01b2 100644
--- a/src/libcore/rt/sched.rs
+++ b/src/libcore/rt/sched.rs
@@ -171,6 +171,17 @@ pub impl Scheduler {
         }
     }
 
+    fn schedule_task(~self, task: ~Coroutine) {
+        assert!(self.in_task_context());
+
+        do self.switch_running_tasks_and_then(task) |last_task| {
+            let last_task = Cell(last_task);
+            do local_sched::borrow |sched| {
+                sched.enqueue_task(last_task.take());
+            }
+        }
+    }
+
     // Core scheduling ops
 
     fn resume_task_immediately(~self, task: ~Coroutine) {
diff --git a/src/libcore/rt/test.rs b/src/libcore/rt/test.rs
index 1294b9bcf47..d739d0110ba 100644
--- a/src/libcore/rt/test.rs
+++ b/src/libcore/rt/test.rs
@@ -8,17 +8,20 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use uint;
+use option::*;
 use cell::Cell;
 use result::{Result, Ok, Err};
 use super::io::net::ip::{IpAddr, Ipv4};
 use rt::local_services::LocalServices;
+use rt::thread::Thread;
 
 /// Creates a new scheduler in a new thread and runs a task in it,
 /// then waits for the scheduler to exit. Failure of the task
 /// will abort the process.
 pub fn run_in_newsched_task(f: ~fn()) {
+    use super::sched::*;
     use unstable::run_in_bare_thread;
-    use super::sched::Coroutine;
     use rt::uv::uvio::UvEventLoop;
 
     let f = Cell(f);
@@ -144,6 +147,23 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
     if !failed { Ok(()) } else { Err(()) }
 }
 
+// Spawn a new task in a new scheduler and return a thread handle.
+pub fn spawntask_thread(f: ~fn()) -> Thread {
+    use rt::sched::*;
+    use rt::uv::uvio::UvEventLoop;
+
+    let f = Cell(f);
+    let thread = do Thread::start {
+        let mut sched = ~UvEventLoop::new_scheduler();
+        let task = ~Coroutine::with_local(&mut sched.stack_pool,
+                                          LocalServices::without_unwinding(),
+                                          f.take());
+        sched.enqueue_task(task);
+        sched.run();
+    };
+    return thread;
+}
+
 /// Get a port number, starting at 9600, for use in tests
 pub fn next_test_port() -> u16 {
     unsafe {
@@ -158,3 +178,14 @@ pub fn next_test_port() -> u16 {
 pub fn next_test_ip4() -> IpAddr {
     Ipv4(127, 0, 0, 1, next_test_port())
 }
+
+/// Get a constant that represents the number of times to repeat stress tests. Default 1.
+pub fn stress_factor() -> uint {
+    use os::getenv;
+
+    match getenv("RUST_RT_STRESS") {
+        Some(val) => uint::from_str(val).get(),
+        None => 1
+    }
+}
+