From 529e268ab900f1b6e731af64ce2aeecda3555f4e Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 5 Dec 2013 18:19:06 -0800 Subject: Fallout of rewriting std::comm --- src/libstd/task/mod.rs | 72 ++++++++++++++---------------------------------- src/libstd/task/spawn.rs | 21 ++++++-------- 2 files changed, 30 insertions(+), 63 deletions(-) (limited to 'src/libstd/task') diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 24a24f24818..0e56f42f5b9 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -55,11 +55,10 @@ use prelude::*; -use comm::{stream, Chan, GenericChan, GenericPort, Port, Peekable}; +use comm::{Chan, Port}; use result::{Result, Ok, Err}; use rt::in_green_task_context; use rt::local::Local; -use rt::task::{UnwindResult, Success, Failure}; use send_str::{SendStr, IntoSendStr}; use util; @@ -81,33 +80,6 @@ pub mod spawn; /// children tasks complete, recommend using a result future. pub type TaskResult = Result<(), ~Any>; -pub struct TaskResultPort { - priv port: Port -} - -fn to_task_result(res: UnwindResult) -> TaskResult { - match res { - Success => Ok(()), Failure(a) => Err(a), - } -} - -impl GenericPort for TaskResultPort { - #[inline] - fn recv(&self) -> TaskResult { - to_task_result(self.port.recv()) - } - - #[inline] - fn try_recv(&self) -> Option { - self.port.try_recv().map(to_task_result) - } -} - -impl Peekable for TaskResultPort { - #[inline] - fn peek(&self) -> bool { self.port.peek() } -} - /// Scheduler modes #[deriving(Eq)] pub enum SchedMode { @@ -150,7 +122,7 @@ pub struct SchedOpts { */ pub struct TaskOpts { priv watched: bool, - priv notify_chan: Option>, + priv notify_chan: Option>, name: Option, sched: SchedOpts, stack_size: Option @@ -232,7 +204,7 @@ impl TaskBuilder { /// /// # Failure /// Fails if a future_result was already set for this task. - pub fn future_result(&mut self) -> TaskResultPort { + pub fn future_result(&mut self) -> Port { // FIXME (#3725): Once linked failure and notification are // handled in the library, I can imagine implementing this by just // registering an arbitrary number of task::on_exit handlers and @@ -243,12 +215,12 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let (notify_pipe_po, notify_pipe_ch) = stream::(); + let (notify_pipe_po, notify_pipe_ch) = Chan::new(); // Reconfigure self to use a notify channel. self.opts.notify_chan = Some(notify_pipe_ch); - TaskResultPort { port: notify_pipe_po } + notify_pipe_po } /// Name the task-to-be. Currently the name is used for identification @@ -341,7 +313,7 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ pub fn try(mut self, f: proc() -> T) -> Result { - let (po, ch) = stream::(); + let (po, ch) = Chan::new(); let result = self.future_result(); @@ -466,7 +438,7 @@ pub fn failing() -> bool { // !!! instead of exiting cleanly. This might wedge the buildbots. !!! #[cfg(test)] -fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } +fn block_forever() { let (po, _ch) = Chan::<()>::new(); po.recv(); } #[test] fn test_unnamed_task() { @@ -528,9 +500,8 @@ fn test_send_named_task() { #[test] fn test_run_basic() { - let (po, ch) = stream::<()>(); - let builder = task(); - do builder.spawn { + let (po, ch) = Chan::new(); + do task().spawn { ch.send(()); } po.recv(); @@ -543,7 +514,7 @@ struct Wrapper { #[test] fn test_add_wrapper() { - let (po, ch) = stream::<()>(); + let (po, ch) = Chan::new(); let mut b0 = task(); do b0.add_wrapper |body| { let ch = ch; @@ -608,8 +579,7 @@ fn get_sched_id() -> int { #[test] fn test_spawn_sched() { - let (po, ch) = stream::<()>(); - let ch = SharedChan::new(ch); + let (po, ch) = SharedChan::new(); fn f(i: int, ch: SharedChan<()>) { let parent_sched_id = get_sched_id(); @@ -632,14 +602,14 @@ fn test_spawn_sched() { #[test] fn test_spawn_sched_childs_on_default_sched() { - let (po, ch) = stream(); + let (po, ch) = Chan::new(); // Assuming tests run on the default scheduler let default_id = get_sched_id(); do spawn_sched(SingleThreaded) { - let parent_sched_id = get_sched_id(); let ch = ch; + let parent_sched_id = get_sched_id(); do spawn { let child_sched_id = get_sched_id(); assert!(parent_sched_id != child_sched_id); @@ -660,8 +630,8 @@ fn test_spawn_sched_blocking() { // Testing that a task in one scheduler can block in foreign code // without affecting other schedulers 20u.times(|| { - let (start_po, start_ch) = stream(); - let (fin_po, fin_ch) = stream(); + let (start_po, start_ch) = Chan::new(); + let (fin_po, fin_ch) = Chan::new(); let mut lock = Mutex::new(); let lock2 = lock.clone(); @@ -686,14 +656,14 @@ fn test_spawn_sched_blocking() { let mut val = 20; while val > 0 { val = po.recv(); - ch.send(val - 1); + ch.try_send(val - 1); } } - let (setup_po, setup_ch) = stream(); - let (parent_po, parent_ch) = stream(); + let (setup_po, setup_ch) = Chan::new(); + let (parent_po, parent_ch) = Chan::new(); do spawn { - let (child_po, child_ch) = stream(); + let (child_po, child_ch) = Chan::new(); setup_ch.send(child_ch); pingpong(&child_po, &parent_ch); }; @@ -712,12 +682,12 @@ fn test_spawn_sched_blocking() { #[cfg(test)] fn avoid_copying_the_body(spawnfn: |v: proc()|) { - let (p, ch) = stream::(); + let (p, ch) = Chan::::new(); let x = ~1; let x_in_parent = ptr::to_unsafe_ptr(&*x) as uint; - do spawnfn || { + do spawnfn { let x_in_child = ptr::to_unsafe_ptr(&*x) as uint; ch.send(x_in_child); } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 4ab7b74d300..eb3e19f4a5a 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -77,18 +77,15 @@ use prelude::*; -use comm::{GenericChan, oneshot}; +use comm::Chan; use rt::local::Local; use rt::sched::{Scheduler, Shutdown, TaskFromFriend}; use rt::task::{Task, Sched}; -use rt::task::UnwindResult; use rt::thread::Thread; use rt::{in_green_task_context, new_event_loop}; -use task::SingleThreaded; -use task::TaskOpts; +use task::{SingleThreaded, TaskOpts, TaskResult}; #[cfg(test)] use task::default_task_opts; -#[cfg(test)] use comm; #[cfg(test)] use task; pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { @@ -132,7 +129,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { // Create a task that will later be used to join with the new scheduler // thread when it is ready to terminate - let (thread_port, thread_chan) = oneshot(); + let (thread_port, thread_chan) = Chan::new(); let join_task = do Task::build_child(None) { debug!("running join task"); let thread: Thread<()> = thread_port.recv(); @@ -173,7 +170,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { if opts.notify_chan.is_some() { let notify_chan = opts.notify_chan.take_unwrap(); - let on_exit: proc(UnwindResult) = proc(task_result) { + let on_exit: proc(TaskResult) = proc(task_result) { notify_chan.send(task_result) }; task.death.on_exit = Some(on_exit); @@ -187,7 +184,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { #[test] fn test_spawn_raw_simple() { - let (po, ch) = stream(); + let (po, ch) = Chan::new(); do spawn_raw(default_task_opts()) { ch.send(()); } @@ -208,7 +205,7 @@ fn test_spawn_raw_unsupervise() { #[test] fn test_spawn_raw_notify_success() { - let (notify_po, notify_ch) = comm::stream(); + let (notify_po, notify_ch) = Chan::new(); let opts = task::TaskOpts { notify_chan: Some(notify_ch), @@ -216,13 +213,13 @@ fn test_spawn_raw_notify_success() { }; do spawn_raw(opts) { } - assert!(notify_po.recv().is_success()); + assert!(notify_po.recv().is_ok()); } #[test] fn test_spawn_raw_notify_failure() { // New bindings for these - let (notify_po, notify_ch) = comm::stream(); + let (notify_po, notify_ch) = Chan::new(); let opts = task::TaskOpts { watched: false, @@ -232,5 +229,5 @@ fn test_spawn_raw_notify_failure() { do spawn_raw(opts) { fail!(); } - assert!(notify_po.recv().is_failure()); + assert!(notify_po.recv().is_err()); } -- cgit 1.4.1-3-g733a5