about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-04 17:30:31 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-14 14:52:06 -0700
commit414f3c7d252fcd54562c0c8a85499d7d07f5e612 (patch)
treea54ab1c7ea84529b6efa9b2e21429b717cdbc376
parent40a9de5ebc8bea4d0a42a89e657a35a5b07d4042 (diff)
downloadrust-414f3c7d252fcd54562c0c8a85499d7d07f5e612.tar.gz
rust-414f3c7d252fcd54562c0c8a85499d7d07f5e612.zip
core::rt: Add a simple channel type for passing buffered messages between Scheduler and Task
Called 'Tube' for lack of anything better.
-rw-r--r--src/libcore/logging.rs3
-rw-r--r--src/libcore/rt/mod.rs6
-rw-r--r--src/libcore/rt/tube.rs182
-rw-r--r--src/libcore/sys.rs29
4 files changed, 212 insertions, 8 deletions
diff --git a/src/libcore/logging.rs b/src/libcore/logging.rs
index 70195afb20a..4308d22548f 100644
--- a/src/libcore/logging.rs
+++ b/src/libcore/logging.rs
@@ -19,6 +19,7 @@ use libc;
 use repr;
 use vec;
 use cast;
+use str;
 
 /// Turns on logging to stdout globally
 pub fn console_on() {
@@ -57,7 +58,7 @@ pub fn log_type<T>(level: u32, object: &T) {
         }
         _ => {
             // XXX: Bad allocation
-            let msg = bytes.to_str();
+            let msg = str::from_bytes(bytes);
             newsched_log_str(msg);
         }
     }
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index ce3fb71ef2c..b2ba6d7d3c4 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -18,7 +18,7 @@ use libc::c_char;
 mod sched;
 
 /// Thread-local access to the current Scheduler
-mod local_sched;
+pub mod local_sched;
 
 /// Synchronous I/O
 #[path = "io/mod.rs"]
@@ -68,6 +68,10 @@ pub mod test;
 /// Reference counting
 pub mod rc;
 
+/// A simple single-threaded channel type for passing buffered data between
+/// scheduler and task context
+pub mod tube;
+
 /// Set up a default runtime configuration, given compiler-supplied arguments.
 ///
 /// This is invoked by the `start` _language item_ (unstable::lang) to
diff --git a/src/libcore/rt/tube.rs b/src/libcore/rt/tube.rs
new file mode 100644
index 00000000000..ef376199fcb
--- /dev/null
+++ b/src/libcore/rt/tube.rs
@@ -0,0 +1,182 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A very simple unsynchronized channel type for sending buffered data from
+//! scheduler context to task context.
+//!
+//! XXX: This would be safer to use if split into two types like Port/Chan
+
+use option::*;
+use clone::Clone;
+use super::rc::RC;
+use rt::sched::Task;
+use rt::{context, TaskContext, SchedulerContext};
+use rt::local_sched;
+
+struct TubeState<T> {
+    blocked_task: Option<~Task>,
+    buf: ~[T]
+}
+
+pub struct Tube<T> {
+    p: RC<TubeState<T>>
+}
+
+impl<T> Tube<T> {
+    pub fn new() -> Tube<T> {
+        Tube {
+            p: RC::new(TubeState {
+                blocked_task: None,
+                buf: ~[]
+            })
+        }
+    }
+
+    pub fn send(&mut self, val: T) {
+        rtdebug!("tube send");
+        assert!(context() == SchedulerContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            (*state).buf.push(val);
+
+            if (*state).blocked_task.is_some() {
+                // There's a waiting task. Wake it up
+                rtdebug!("waking blocked tube");
+                let task = (*state).blocked_task.swap_unwrap();
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+        }
+    }
+
+    pub fn recv(&mut self) -> T {
+        assert!(context() == TaskContext);
+
+        unsafe {
+            let state = self.p.unsafe_borrow_mut();
+            if !(*state).buf.is_empty() {
+                return (*state).buf.shift();
+            } else {
+                // Block and wait for the next message
+                rtdebug!("blocking on tube recv");
+                assert!(self.p.refcount() > 1); // There better be somebody to wake us up
+                assert!((*state).blocked_task.is_none());
+                let sched = local_sched::take();
+                do sched.deschedule_running_task_and_then |task| {
+                    (*state).blocked_task = Some(task);
+                }
+                rtdebug!("waking after tube recv");
+                let buf = &mut (*state).buf;
+                assert!(!buf.is_empty());
+                return buf.shift();
+            }
+        }
+    }
+}
+
+impl<T> Clone for Tube<T> {
+    fn clone(&self) -> Tube<T> {
+        Tube { p: self.p.clone() }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use int;
+    use cell::Cell;
+    use rt::local_sched;
+    use rt::test::*;
+    use rt::rtio::EventLoop;
+    use super::*;
+
+    #[test]
+    fn simple_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone_cell = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let mut tube_clone = tube_clone_cell.take();
+                tube_clone.send(1);
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn blocking_test() {
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(Cell(Cell(tube_clone)));
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                let tube_clone = tube_clone.take();
+                do local_sched::borrow |sched| {
+                    let tube_clone = tube_clone.take();
+                    do sched.event_loop.callback {
+                        let mut tube_clone = tube_clone.take();
+                        // The task should be blocked on this now and
+                        // sending will wake it up.
+                        tube_clone.send(1);
+                    }
+                }
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            assert!(tube.recv() == 1);
+        }
+    }
+
+    #[test]
+    fn many_blocking_test() {
+        static MAX: int = 100;
+
+        do run_in_newsched_task {
+            let mut tube: Tube<int> = Tube::new();
+            let tube_clone = tube.clone();
+            let tube_clone = Cell(tube_clone);
+            let sched = local_sched::take();
+            do sched.deschedule_running_task_and_then |task| {
+                callback_send(tube_clone.take(), 0);
+
+                fn callback_send(tube: Tube<int>, i: int) {
+                    if i == 100 { return; }
+
+                    let tube = Cell(Cell(tube));
+                    do local_sched::borrow |sched| {
+                        let tube = tube.take();
+                        do sched.event_loop.callback {
+                            let mut tube = tube.take();
+                            // The task should be blocked on this now and
+                            // sending will wake it up.
+                            tube.send(i);
+                            callback_send(tube, i + 1);
+                        }
+                    }
+                }
+
+                let sched = local_sched::take();
+                sched.resume_task_immediately(task);
+            }
+
+            for int::range(0, MAX) |i| {
+                let j = tube.recv();
+                assert!(j == i);
+            }
+        }
+    }
+}
diff --git a/src/libcore/sys.rs b/src/libcore/sys.rs
index a27b6fe615f..50a739ec67d 100644
--- a/src/libcore/sys.rs
+++ b/src/libcore/sys.rs
@@ -202,10 +202,12 @@ impl FailWithCause for &'static str {
 
 // FIXME #4427: Temporary until rt::rt_fail_ goes away
 pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
-    use rt::{context, OldTaskContext};
-    use rt::local_services::unsafe_borrow_local_services;
+    use option::Option;
+    use rt::{context, OldTaskContext, TaskContext};
+    use rt::local_services::{unsafe_borrow_local_services, Unwinder};
 
-    match context() {
+    let context = context();
+    match context {
         OldTaskContext => {
             unsafe {
                 gc::cleanup_stack_for_failure();
@@ -214,11 +216,26 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
             }
         }
         _ => {
-            // XXX: Need to print the failure message
-            gc::cleanup_stack_for_failure();
             unsafe {
+                // XXX: Bad re-allocations. fail! needs some refactoring
+                let msg = str::raw::from_c_str(msg);
+                let file = str::raw::from_c_str(file);
+
+                let outmsg = fmt!("%s at line %i of file %s", msg, line as int, file);
+
+                // XXX: Logging doesn't work correctly in non-task context because it
+                // invokes the local heap
+                if context == TaskContext {
+                    error!(outmsg);
+                } else {
+                    rtdebug!("%s", outmsg);
+                }
+
+                gc::cleanup_stack_for_failure();
+
                 let local_services = unsafe_borrow_local_services();
-                match (*local_services).unwinder {
+                let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
+                match *unwinder {
                     Some(ref mut unwinder) => unwinder.begin_unwind(),
                     None => abort!("failure without unwinder. aborting process")
                 }