about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-01-15 19:53:35 -0800
committerBrian Anderson <banderson@mozilla.com>2013-01-23 17:35:31 -0800
commit8852279a9ecac970e30b6d92d7efdcbd5485769c (patch)
tree53a3e26fe5cf72ea12ecac35664db7e70f85eeed
parent1bf8e579436941a82e4a4806b74dfd27ed4d1d74 (diff)
downloadrust-8852279a9ecac970e30b6d92d7efdcbd5485769c.tar.gz
rust-8852279a9ecac970e30b6d92d7efdcbd5485769c.zip
core: Add new weak task API
-rw-r--r--src/libcore/pipes.rs10
-rw-r--r--src/libcore/private.rs2
-rw-r--r--src/libcore/private/weak_task.rs187
-rw-r--r--src/rt/rust_builtin.cpp12
-rw-r--r--src/rt/rust_kernel.cpp24
-rw-r--r--src/rt/rust_kernel.h2
-rw-r--r--src/rt/rustrt.def.in4
7 files changed, 233 insertions, 8 deletions
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 2ff4effbd6e..2865c942138 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -1234,6 +1234,16 @@ pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
     (port, chan)
 }
 
+impl<T: Owned> PortOne<T> {
+    fn recv(self) -> T { recv_one(self) }
+    fn try_recv(self) -> Option<T> { try_recv_one(self) }
+}
+
+impl<T: Owned> ChanOne<T> {
+    fn send(self, data: T) { send_one(self, data) }
+    fn try_send(self, data: T) -> bool { try_send_one(self, data) }
+}
+
 /**
  * Receive a message from a oneshot pipe, failing if the connection was
  * closed.
diff --git a/src/libcore/private.rs b/src/libcore/private.rs
index 3eadce1c30c..aa976ee745d 100644
--- a/src/libcore/private.rs
+++ b/src/libcore/private.rs
@@ -34,6 +34,8 @@ pub mod at_exit;
 pub mod global;
 #[path = "private/finally.rs"]
 pub mod finally;
+#[path = "private/weak_task.rs"]
+pub mod weak_task;
 
 extern mod rustrt {
     #[legacy_exports];
diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs
new file mode 100644
index 00000000000..868361b0e60
--- /dev/null
+++ b/src/libcore/private/weak_task.rs
@@ -0,0 +1,187 @@
+/*!
+Weak tasks
+
+Weak tasks are a runtime feature for building global services that
+do not keep the runtime alive. Normally the runtime exits when all
+tasks exits, but if a task is weak then the runtime may exit while
+it is running, sending a notification to the task that the runtime
+is trying to shut down.
+*/
+
+use option::{Some, None, swap_unwrap};
+use private::at_exit::at_exit;
+use private::global::global_data_clone_create;
+use private::finally::Finally;
+use pipes::{Port, Chan, SharedChan, stream};
+use task::{Task, task, spawn};
+use task::rt::{task_id, get_task_id};
+use send_map::linear::LinearMap;
+use ops::Drop;
+
+type ShutdownMsg = ();
+
+// XXX: This could be a PortOne but I've experienced bugginess
+// with oneshot pipes and try_send
+pub unsafe fn weaken_task(f: &fn(Port<ShutdownMsg>)) {
+    let service = global_data_clone_create(global_data_key,
+                                           create_global_service);
+    let (shutdown_port, shutdown_chan) = stream::<ShutdownMsg>();
+    let shutdown_port = ~mut Some(shutdown_port);
+    let task = get_task_id();
+    // Expect the weak task service to be alive
+    assert service.try_send(RegisterWeakTask(task, shutdown_chan));
+    unsafe { rust_inc_weak_task_count(); }
+    do fn&() {
+        let shutdown_port = swap_unwrap(&mut *shutdown_port);
+        f(shutdown_port)
+    }.finally || {
+        unsafe { rust_dec_weak_task_count(); }
+        // Service my have already exited
+        service.send(UnregisterWeakTask(task));
+    }
+}
+
+type WeakTaskService = SharedChan<ServiceMsg>;
+type TaskHandle = task_id;
+
+fn global_data_key(_v: WeakTaskService) { }
+
+enum ServiceMsg {
+    RegisterWeakTask(TaskHandle, Chan<ShutdownMsg>),
+    UnregisterWeakTask(TaskHandle),
+    Shutdown
+}
+
+fn create_global_service() -> ~WeakTaskService {
+
+    debug!("creating global weak task service");
+    let (port, chan) = stream::<ServiceMsg>();
+    let port = ~mut Some(port);
+    let chan = SharedChan(chan);
+    let chan_clone = chan.clone();
+
+    do task().unlinked().spawn {
+        debug!("running global weak task service");
+        let port = swap_unwrap(&mut *port);
+        let port = ~mut Some(port);
+        do fn&() {
+            let port = swap_unwrap(&mut *port);
+            // The weak task service is itself a weak task
+            debug!("weakening the weak service task");
+            unsafe { rust_inc_weak_task_count(); }
+            run_weak_task_service(port);
+        }.finally {
+            debug!("unweakening the weak service task");
+            unsafe { rust_dec_weak_task_count(); }
+        }
+    }
+
+    do at_exit {
+        debug!("shutting down weak task service");
+        chan.send(Shutdown);
+    }
+
+    return ~chan_clone;
+}
+
+fn run_weak_task_service(port: Port<ServiceMsg>) {
+
+    let mut shutdown_map = LinearMap();
+
+    loop {
+        match port.recv() {
+            RegisterWeakTask(task, shutdown_chan) => {
+                let previously_unregistered =
+                    shutdown_map.insert(task, shutdown_chan);
+                assert previously_unregistered;
+            }
+            UnregisterWeakTask(task) => {
+                match shutdown_map.pop(&task) {
+                    Some(shutdown_chan) => {
+                        // Oneshot pipes must send, even though
+                        // nobody will receive this
+                        shutdown_chan.send(());
+                    }
+                    None => fail
+                }
+            }
+            Shutdown => break
+        }
+    }
+
+    do shutdown_map.consume |_, shutdown_chan| {
+        // Weak task may have already exited
+        shutdown_chan.send(());
+    }
+}
+
+extern {
+    unsafe fn rust_inc_weak_task_count();
+    unsafe fn rust_dec_weak_task_count();
+}
+
+#[test]
+fn test_simple() unsafe {
+    let (port, chan) = stream();
+    do spawn unsafe {
+        do weaken_task |_signal| {
+        }
+        chan.send(());
+    }
+    port.recv();
+}
+
+#[test]
+fn test_weak_weak() unsafe {
+    let (port, chan) = stream();
+    do spawn unsafe {
+        do weaken_task |_signal| {
+        }
+        do weaken_task |_signal| {
+        }
+        chan.send(());
+    }
+    port.recv();
+}
+
+#[test]
+fn test_wait_for_signal() unsafe {
+    do spawn unsafe {
+        do weaken_task |signal| {
+            signal.recv();
+        }
+    }
+}
+
+#[test]
+fn test_wait_for_signal_many() unsafe {
+    use uint;
+    for uint::range(0, 100) |_| {
+        do spawn unsafe {
+            do weaken_task |signal| {
+                signal.recv();
+            }
+        }
+    }
+}
+
+#[test]
+fn test_select_stream_and_oneshot() unsafe {
+    use pipes::select2i;
+    use either::{Left, Right};
+
+    let (port, chan) = stream();
+    let (waitport, waitchan) = stream();
+    do spawn unsafe {
+        do weaken_task |signal| {
+            match select2i(&port, &signal) {
+                Left(*) => (),
+                Right(*) => fail
+            }
+        }
+        waitchan.send(());
+    }
+    chan.send(());
+    waitport.recv();
+}
+
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 221afb89b23..a5e1260d4a5 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -1038,6 +1038,18 @@ rust_get_global_data_ptr() {
     return &task->kernel->global_data;
 }
 
+extern "C" void
+rust_inc_weak_task_count() {
+    rust_task *task = rust_get_current_task();
+    task->kernel->inc_weak_task_count();
+}
+
+extern "C" void
+rust_dec_weak_task_count() {
+    rust_task *task = rust_get_current_task();
+    task->kernel->dec_weak_task_count();
+}
+
 //
 // Local Variables:
 // mode: C++
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index 9c6ba9dcda3..d270ac07633 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -377,17 +377,12 @@ rust_kernel::weaken_task(rust_port_id chan) {
         KLOG_("Weakening task with channel %" PRIdPTR, chan);
         weak_task_chans.push_back(chan);
     }
-    uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
-    KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
-    if (new_non_weak_tasks == 0) {
-        begin_shutdown();
-    }
+    inc_weak_task_count();
 }
 
 void
 rust_kernel::unweaken_task(rust_port_id chan) {
-    uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
-    KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
+    dec_weak_task_count();
     {
         scoped_lock with(weak_task_lock);
         KLOG_("Unweakening task with channel %" PRIdPTR, chan);
@@ -400,6 +395,21 @@ rust_kernel::unweaken_task(rust_port_id chan) {
 }
 
 void
+rust_kernel::inc_weak_task_count() {
+    uintptr_t new_non_weak_tasks = sync::decrement(non_weak_tasks);
+    KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
+    if (new_non_weak_tasks == 0) {
+        begin_shutdown();
+    }
+}
+
+void
+rust_kernel::dec_weak_task_count() {
+    uintptr_t new_non_weak_tasks = sync::increment(non_weak_tasks);
+    KLOG_("New non-weak tasks %" PRIdPTR, new_non_weak_tasks);
+}
+
+void
 rust_kernel::end_weak_tasks() {
     std::vector<rust_port_id> chancopies;
     {
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index 99b230f7872..f90ecf01a7b 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -187,6 +187,8 @@ public:
     void unregister_task();
     void weaken_task(rust_port_id chan);
     void unweaken_task(rust_port_id chan);
+    void inc_weak_task_count();
+    void dec_weak_task_count();
 
     bool send_to_port(rust_port_id chan, void *sptr);
 
diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in
index 8c26832f349..5be823d8fde 100644
--- a/src/rt/rustrt.def.in
+++ b/src/rt/rustrt.def.in
@@ -211,4 +211,6 @@ linenoiseHistoryLoad
 rust_raw_thread_start
 rust_raw_thread_join_delete
 rust_register_exit_function
-rust_get_global_data_ptr
\ No newline at end of file
+rust_get_global_data_ptr
+rust_inc_weak_task_count
+rust_dec_weak_task_count
\ No newline at end of file