diff options
Diffstat (limited to 'src/libextra')
| -rw-r--r-- | src/libextra/arc.rs | 27 | ||||
| -rw-r--r-- | src/libextra/comm.rs | 119 | ||||
| -rw-r--r-- | src/libextra/future.rs | 8 | ||||
| -rw-r--r-- | src/libextra/sync.rs | 101 | ||||
| -rw-r--r-- | src/libextra/task_pool.rs | 6 | ||||
| -rw-r--r-- | src/libextra/test.rs | 19 | ||||
| -rw-r--r-- | src/libextra/workcache.rs | 11 |
7 files changed, 121 insertions, 170 deletions
diff --git a/src/libextra/arc.rs b/src/libextra/arc.rs index 5700a299945..ea8066b786f 100644 --- a/src/libextra/arc.rs +++ b/src/libextra/arc.rs @@ -597,7 +597,6 @@ mod tests { use arc::*; - use std::comm; use std::task; #[test] @@ -605,7 +604,7 @@ mod tests { let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let arc_v = Arc::new(v); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); do task::spawn { let arc_v: Arc<~[int]> = p.recv(); @@ -626,7 +625,7 @@ mod tests { fn test_mutex_arc_condvar() { let arc = ~MutexArc::new(false); let arc2 = ~arc.clone(); - let (p,c) = comm::oneshot(); + let (p,c) = Chan::new(); do task::spawn { // wait until parent gets in p.recv(); @@ -638,7 +637,7 @@ mod tests { let mut c = Some(c); arc.access_cond(|state, cond| { - c.take_unwrap().send(()); + c.take_unwrawp().send(()); assert!(!*state); while !*state { cond.wait(); @@ -650,7 +649,7 @@ mod tests { fn test_arc_condvar_poison() { let arc = ~MutexArc::new(1); let arc2 = ~arc.clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); do spawn { let _ = p.recv(); @@ -687,7 +686,7 @@ mod tests { pub fn test_mutex_arc_unwrap_poison() { let arc = MutexArc::new(1); let arc2 = ~(&arc).clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); do task::spawn { arc2.access(|one| { c.send(()); @@ -804,7 +803,7 @@ mod tests { fn test_rw_arc() { let arc = RWArc::new(0); let arc2 = arc.clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); do task::spawn { arc2.write(|num| { @@ -832,7 +831,7 @@ mod tests { }); // Wait for children to pass their asserts - for r in children.iter() { + for r in children.mut_iter() { r.recv(); } @@ -855,7 +854,7 @@ mod tests { // Reader tasks let mut reader_convos = ~[]; 10.times(|| { - let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream()); + let ((rp1, rc1), (rp2, rc2)) = (Chan::new(), Chan::new()); reader_convos.push((rc1, rp2)); let arcn = arc.clone(); do task::spawn { @@ -869,7 +868,7 @@ mod tests { // Writer task let arc2 = arc.clone(); - let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream()); + let ((wp1, wc1), (wp2, wc2)) = (Chan::new(), Chan::new()); do task::spawn || { wp1.recv(); arc2.write_cond(|state, cond| { @@ -897,14 +896,14 @@ mod tests { assert_eq!(*state, 42); *state = 31337; // send to other readers - for &(ref rc, _) in reader_convos.iter() { + for &(ref mut rc, _) in reader_convos.mut_iter() { rc.send(()) } }); let read_mode = arc.downgrade(write_mode); read_mode.read(|state| { // complete handshake with other readers - for &(_, ref rp) in reader_convos.iter() { + for &(_, ref mut rp) in reader_convos.mut_iter() { rp.recv() } wc1.send(()); // tell writer to try again @@ -926,7 +925,7 @@ mod tests { // "blk(&Condvar { order: opt_lock, ..*cond })" // with just "blk(cond)". let x = RWArc::new(true); - let (wp, wc) = comm::stream(); + let (wp, wc) = Chan::new(); // writer task let xw = x.clone(); @@ -951,7 +950,7 @@ mod tests { }); // make a reader task to trigger the "reader cloud lock" handoff let xr = x.clone(); - let (rp, rc) = comm::stream(); + let (rp, rc) = Chan::new(); do task::spawn { rc.send(()); xr.read(|_state| { }) diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs index 42287736ffa..09dd85fe0de 100644 --- a/src/libextra/comm.rs +++ b/src/libextra/comm.rs @@ -16,11 +16,6 @@ Higher level communication abstractions. #[allow(missing_doc)]; - -use std::comm::{GenericChan, GenericSmartChan, GenericPort}; -use std::comm::{Chan, Port, Peekable}; -use std::comm; - /// An extension of `pipes::stream` that allows both sending and receiving. pub struct DuplexStream<T, U> { priv chan: Chan<T>, @@ -29,108 +24,73 @@ pub struct DuplexStream<T, U> { // Allow these methods to be used without import: impl<T:Send,U:Send> DuplexStream<T, U> { + /// Creates a bidirectional stream. + pub fn new() -> (DuplexStream<T, U>, DuplexStream<U, T>) { + let (p1, c2) = Chan::new(); + let (p2, c1) = Chan::new(); + (DuplexStream { chan: c1, port: p1 }, + DuplexStream { chan: c2, port: p2 }) + } pub fn send(&self, x: T) { self.chan.send(x) } pub fn try_send(&self, x: T) -> bool { self.chan.try_send(x) } - pub fn recv(&self, ) -> U { + pub fn recv(&self) -> U { self.port.recv() } pub fn try_recv(&self) -> Option<U> { self.port.try_recv() } - pub fn peek(&self) -> bool { - self.port.peek() - } -} - -impl<T:Send,U:Send> GenericChan<T> for DuplexStream<T, U> { - fn send(&self, x: T) { - self.chan.send(x) - } -} - -impl<T:Send,U:Send> GenericSmartChan<T> for DuplexStream<T, U> { - fn try_send(&self, x: T) -> bool { - self.chan.try_send(x) - } -} - -impl<T:Send,U:Send> GenericPort<U> for DuplexStream<T, U> { - fn recv(&self) -> U { - self.port.recv() - } - - fn try_recv(&self) -> Option<U> { - self.port.try_recv() - } -} - -impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> { - fn peek(&self) -> bool { - self.port.peek() + pub fn recv_opt(&self) -> Option<U> { + self.port.recv_opt() } } -/// Creates a bidirectional stream. -pub fn DuplexStream<T:Send,U:Send>() - -> (DuplexStream<T, U>, DuplexStream<U, T>) -{ - let (p1, c2) = comm::stream(); - let (p2, c1) = comm::stream(); - (DuplexStream { - chan: c1, - port: p1 - }, - DuplexStream { - chan: c2, - port: p2 - }) -} - /// An extension of `pipes::stream` that provides synchronous message sending. pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> } /// An extension of `pipes::stream` that acknowledges each message received. pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> } -impl<T: Send> GenericChan<T> for SyncChan<T> { - fn send(&self, val: T) { +impl<T: Send> SyncChan<T> { + pub fn send(&self, val: T) { assert!(self.try_send(val), "SyncChan.send: receiving port closed"); } -} -impl<T: Send> GenericSmartChan<T> for SyncChan<T> { - /// Sends a message, or report if the receiver has closed the connection before receiving. - fn try_send(&self, val: T) -> bool { - self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some() + /// Sends a message, or report if the receiver has closed the connection + /// before receiving. + pub fn try_send(&self, val: T) -> bool { + self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some() } } -impl<T: Send> GenericPort<T> for SyncPort<T> { - fn recv(&self) -> T { - self.try_recv().expect("SyncPort.recv: sending channel closed") +impl<T: Send> SyncPort<T> { + pub fn recv(&self) -> T { + self.recv_opt().expect("SyncPort.recv: sending channel closed") } - fn try_recv(&self) -> Option<T> { - self.duplex_stream.try_recv().map(|val| { + pub fn recv_opt(&self) -> Option<T> { + self.duplex_stream.recv_opt().map(|val| { self.duplex_stream.try_send(()); val }) } -} -impl<T: Send> Peekable<T> for SyncPort<T> { - fn peek(&self) -> bool { - self.duplex_stream.peek() + pub fn try_recv(&self) -> Option<T> { + self.duplex_stream.try_recv().map(|val| { + self.duplex_stream.try_send(()); + val + }) } } -/// Creates a stream whose channel, upon sending a message, blocks until the message is received. +/// Creates a stream whose channel, upon sending a message, blocks until the +/// message is received. pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) { - let (chan_stream, port_stream) = DuplexStream(); - (SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream }) + let (chan_stream, port_stream) = DuplexStream::new(); + (SyncPort { duplex_stream: port_stream }, + SyncChan { duplex_stream: chan_stream }) } #[cfg(test)] @@ -141,7 +101,7 @@ mod test { #[test] pub fn DuplexStream1() { - let (left, right) = DuplexStream(); + let (mut left, mut right) = DuplexStream::new(); left.send(~"abc"); right.send(123); @@ -152,9 +112,10 @@ mod test { #[test] pub fn basic_rendezvous_test() { - let (port, chan) = rendezvous(); + let (mut port, chan) = rendezvous(); do spawn { + let mut chan = chan; chan.send("abc"); } @@ -165,8 +126,9 @@ mod test { fn recv_a_lot() { // Rendezvous streams should be able to handle any number of messages being sent do run_in_uv_task { - let (port, chan) = rendezvous(); + let (mut port, chan) = rendezvous(); do spawn { + let mut chan = chan; 1000000.times(|| { chan.send(()) }) } 1000000.times(|| { port.recv() }) @@ -175,8 +137,9 @@ mod test { #[test] fn send_and_fail_and_try_recv() { - let (port, chan) = rendezvous(); + let (mut port, chan) = rendezvous(); do spawn { + let mut chan = chan; chan.duplex_stream.send(()); // Can't access this field outside this module fail!() } @@ -185,8 +148,9 @@ mod test { #[test] fn try_send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); + let (port, mut chan) = rendezvous(); do spawn { + let mut port = port; port.duplex_stream.recv(); fail!() } @@ -196,8 +160,9 @@ mod test { #[test] #[should_fail] fn send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); + let (port, mut chan) = rendezvous(); do spawn { + let mut port = port; port.duplex_stream.recv(); fail!() } diff --git a/src/libextra/future.rs b/src/libextra/future.rs index 1a2ac398132..eb61b7781f1 100644 --- a/src/libextra/future.rs +++ b/src/libextra/future.rs @@ -25,7 +25,6 @@ #[allow(missing_doc)]; -use std::comm::{PortOne, oneshot}; use std::util::replace; /// A type encapsulating the result of a computation which may not be complete @@ -104,7 +103,7 @@ impl<A> Future<A> { } impl<A:Send> Future<A> { - pub fn from_port(port: PortOne<A>) -> Future<A> { + pub fn from_port(port: Port<A>) -> Future<A> { /*! * Create a future from a port * @@ -125,7 +124,7 @@ impl<A:Send> Future<A> { * value of the future. */ - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawn { chan.send(blk()); @@ -139,7 +138,6 @@ impl<A:Send> Future<A> { mod test { use future::Future; - use std::comm::oneshot; use std::task; #[test] @@ -150,7 +148,7 @@ mod test { #[test] fn test_from_port() { - let (po, ch) = oneshot(); + let (po, ch) = Chan::new(); ch.send(~"whale"); let mut f = Future::from_port(po); assert_eq!(f.get(), ~"whale"); diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index f00c3d8db9a..1cc403c32f4 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,9 +19,6 @@ use std::borrow; -use std::comm; -use std::comm::SendDeferred; -use std::comm::{GenericPort, Peekable}; use std::unstable::sync::{Exclusive, UnsafeArc}; use std::unstable::atomics; use std::unstable::finally::Finally; @@ -34,48 +31,53 @@ use std::util::NonCopyable; // Each waiting task receives on one of these. #[doc(hidden)] -type WaitEnd = comm::PortOne<()>; +type WaitEnd = Port<()>; #[doc(hidden)] -type SignalEnd = comm::ChanOne<()>; +type SignalEnd = Chan<()>; // A doubly-ended queue of waiting tasks. #[doc(hidden)] -struct WaitQueue { head: comm::Port<SignalEnd>, - tail: comm::Chan<SignalEnd> } +struct WaitQueue { head: Port<SignalEnd>, + tail: Chan<SignalEnd> } impl WaitQueue { fn new() -> WaitQueue { - let (block_head, block_tail) = comm::stream(); + let (block_head, block_tail) = Chan::new(); WaitQueue { head: block_head, tail: block_tail } } // Signals one live task from the queue. fn signal(&self) -> bool { - // The peek is mandatory to make sure recv doesn't block. - if self.head.peek() { - // Pop and send a wakeup signal. If the waiter was killed, its port - // will have closed. Keep trying until we get a live task. - if self.head.recv().try_send_deferred(()) { - true - } else { - self.signal() + match self.head.try_recv() { + Some(ch) => { + // Send a wakeup signal. If the waiter was killed, its port will + // have closed. Keep trying until we get a live task. + if ch.try_send_deferred(()) { + true + } else { + self.signal() + } } - } else { - false + None => false } } fn broadcast(&self) -> uint { let mut count = 0; - while self.head.peek() { - if self.head.recv().try_send_deferred(()) { - count += 1; + loop { + match self.head.try_recv() { + None => break, + Some(ch) => { + if ch.try_send_deferred(()) { + count += 1; + } + } } } count } fn wait_end(&self) -> WaitEnd { - let (wait_end, signal_end) = comm::oneshot(); + let (wait_end, signal_end) = Chan::new(); self.tail.send_deferred(signal_end); wait_end } @@ -282,8 +284,7 @@ impl<'a> Condvar<'a> { condvar_id, "cond.signal_on()", || { - let queue = queue.take_unwrap(); - queue.broadcast() + queue.take_unwrap().broadcast() }) } } @@ -676,7 +677,6 @@ mod tests { use sync::*; use std::cast; - use std::comm; use std::result; use std::task; @@ -711,7 +711,7 @@ mod tests { #[test] fn test_sem_as_cvar() { /* Child waits and parent signals */ - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let s = Semaphore::new(0); let s2 = s.clone(); do task::spawn { @@ -723,7 +723,7 @@ mod tests { let _ = p.recv(); /* Parent waits and child signals */ - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let s = Semaphore::new(0); let s2 = s.clone(); do task::spawn { @@ -740,8 +740,8 @@ mod tests { // time, and shake hands. let s = Semaphore::new(2); let s2 = s.clone(); - let (p1,c1) = comm::stream(); - let (p2,c2) = comm::stream(); + let (p1,c1) = Chan::new(); + let (p2,c2) = Chan::new(); do task::spawn { s2.access(|| { let _ = p2.recv(); @@ -760,7 +760,7 @@ mod tests { do task::spawn_sched(task::SingleThreaded) { let s = Semaphore::new(1); let s2 = s.clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let mut child_data = Some((s2, c)); s.access(|| { let (s2, c) = child_data.take_unwrap(); @@ -782,7 +782,7 @@ mod tests { fn test_mutex_lock() { // Unsafely achieve shared state, and do the textbook // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let m = Mutex::new(); let m2 = m.clone(); let mut sharedstate = ~0; @@ -829,7 +829,7 @@ mod tests { cond.wait(); }); // Parent wakes up child - let (port,chan) = comm::stream(); + let (port,chan) = Chan::new(); let m3 = m.clone(); do task::spawn { m3.lock_cond(|cond| { @@ -852,7 +852,7 @@ mod tests { num_waiters.times(|| { let mi = m.clone(); - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); ports.push(port); do task::spawn { mi.lock_cond(|cond| { @@ -864,13 +864,13 @@ mod tests { }); // wait until all children get in the mutex - for port in ports.iter() { let _ = port.recv(); } + for port in ports.mut_iter() { let _ = port.recv(); } m.lock_cond(|cond| { let num_woken = cond.broadcast(); assert_eq!(num_woken, num_waiters); }); // wait until all children wake up - for port in ports.iter() { let _ = port.recv(); } + for port in ports.mut_iter() { let _ = port.recv(); } } #[test] fn test_mutex_cond_broadcast() { @@ -915,8 +915,8 @@ mod tests { let m2 = m.clone(); let result: result::Result<(), ~Any> = do task::try { - let (p, c) = comm::stream(); - do task::spawn || { // linked + let (p, c) = Chan::new(); + do task::spawn { // linked let _ = p.recv(); // wait for sibling to get in the mutex task::deschedule(); fail!(); @@ -940,19 +940,18 @@ mod tests { let m = Mutex::new(); let m2 = m.clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let result: result::Result<(), ~Any> = do task::try { let mut sibling_convos = ~[]; 2.times(|| { - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); sibling_convos.push(p); let mi = m2.clone(); // spawn sibling task do task::spawn { // linked let mut c = Some(c); mi.lock_cond(|cond| { - let c = c.take_unwrap(); c.send(()); // tell sibling to go ahead (|| { cond.wait(); // block forever @@ -964,7 +963,7 @@ mod tests { }) } }); - for p in sibling_convos.iter() { + for p in sibling_convos.mut_iter() { let _ = p.recv(); // wait for sibling to get in the mutex } m2.lock(|| { }); @@ -973,8 +972,8 @@ mod tests { }; assert!(result.is_err()); // child task must have finished by the time try returns - let r = p.recv(); - for p in r.iter() { p.recv(); } // wait on all its siblings + let mut r = p.recv(); + for p in r.mut_iter() { p.recv(); } // wait on all its siblings m.lock_cond(|cond| { let woken = cond.broadcast(); assert_eq!(woken, 0); @@ -999,7 +998,7 @@ mod tests { let result = do task::try { let m = Mutex::new_with_condvars(2); let m2 = m.clone(); - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); do task::spawn { m2.lock_cond(|cond| { c.send(()); @@ -1060,7 +1059,7 @@ mod tests { mode2: RWLockMode) { // Test mutual exclusion between readers and writers. Just like the // mutex mutual exclusion test, a ways above. - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); let x2 = x.clone(); let mut sharedstate = ~0; { @@ -1111,8 +1110,8 @@ mod tests { make_mode2_go_first: bool) { // Much like sem_multi_resource. let x2 = x.clone(); - let (p1, c1) = comm::stream(); - let (p2, c2) = comm::stream(); + let (p1, c1) = Chan::new(); + let (p2, c2) = Chan::new(); do task::spawn { if !make_mode2_go_first { let _ = p2.recv(); // parent sends to us once it locks, or ... @@ -1177,7 +1176,7 @@ mod tests { cond.wait(); }); // Parent wakes up child - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); let x3 = x.clone(); do task::spawn { x3.write_cond(|cond| { @@ -1214,7 +1213,7 @@ mod tests { num_waiters.times(|| { let xi = x.clone(); - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); ports.push(port); do task::spawn { lock_cond(&xi, dg1, |cond| { @@ -1226,13 +1225,13 @@ mod tests { }); // wait until all children get in the mutex - for port in ports.iter() { let _ = port.recv(); } + for port in ports.mut_iter() { let _ = port.recv(); } lock_cond(&x, dg2, |cond| { let num_woken = cond.broadcast(); assert_eq!(num_woken, num_waiters); }); // wait until all children wake up - for port in ports.iter() { let _ = port.recv(); } + for port in ports.mut_iter() { let _ = port.recv(); } } #[test] fn test_rwlock_cond_broadcast() { diff --git a/src/libextra/task_pool.rs b/src/libextra/task_pool.rs index bda6935643f..f0c9833adf8 100644 --- a/src/libextra/task_pool.rs +++ b/src/libextra/task_pool.rs @@ -14,8 +14,6 @@ /// parallelism. -use std::comm::{Chan, GenericChan, GenericPort}; -use std::comm; use std::task::SchedMode; use std::task; use std::vec; @@ -35,7 +33,7 @@ pub struct TaskPool<T> { #[unsafe_destructor] impl<T> Drop for TaskPool<T> { fn drop(&mut self) { - for channel in self.channels.iter() { + for channel in self.channels.mut_iter() { channel.send(Quit); } } @@ -54,7 +52,7 @@ impl<T> TaskPool<T> { assert!(n_tasks >= 1); let channels = vec::from_fn(n_tasks, |i| { - let (port, chan) = comm::stream::<Msg<T>>(); + let (port, chan) = Chan::<Msg<T>>::new(); let init_fn = init_fn_factory(); let task_body: proc() = proc() { diff --git a/src/libextra/test.rs b/src/libextra/test.rs index c2b4ff05d5d..974d4dc1dc5 100644 --- a/src/libextra/test.rs +++ b/src/libextra/test.rs @@ -29,7 +29,6 @@ use time::precise_time_ns; use treemap::TreeMap; use std::clone::Clone; -use std::comm::{stream, SharedChan, GenericPort, GenericChan}; use std::io; use std::io::File; use std::io::Writer; @@ -746,8 +745,7 @@ fn run_tests(opts: &TestOpts, remaining.reverse(); let mut pending = 0; - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); while pending > 0 || !remaining.is_empty() { while pending < concurrency && !remaining.is_empty() { @@ -872,7 +870,7 @@ pub fn run_test(force_ignore: bool, fn run_test_inner(desc: TestDesc, monitor_ch: SharedChan<MonitorMsg>, testfn: proc()) { - do task::spawn { + do spawn { let mut task = task::task(); task.name(match desc.name { DynTestName(ref name) => SendStrOwned(name.clone()), @@ -1206,7 +1204,6 @@ mod tests { StaticTestName, DynTestName, DynTestFn}; use test::{TestOpts, run_test}; - use std::comm::{stream, SharedChan}; use tempfile::TempDir; #[test] @@ -1220,8 +1217,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res != TrOk); @@ -1238,8 +1234,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrIgnored); @@ -1256,8 +1251,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrOk); @@ -1274,8 +1268,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrFailed); diff --git a/src/libextra/workcache.rs b/src/libextra/workcache.rs index 91f1f1a0d0b..8713dbde920 100644 --- a/src/libextra/workcache.rs +++ b/src/libextra/workcache.rs @@ -15,8 +15,7 @@ use json::ToJson; use serialize::{Encoder, Encodable, Decoder, Decodable}; use arc::{Arc,RWArc}; use treemap::TreeMap; -use std::comm::{PortOne, oneshot}; -use std::{str, task}; +use std::str; use std::io; use std::io::{File, Decorator}; use std::io::mem::MemWriter; @@ -252,7 +251,7 @@ pub struct Exec { enum Work<'a, T> { WorkValue(T), - WorkFromTask(&'a Prep<'a>, PortOne<(Exec, T)>), + WorkFromTask(&'a Prep<'a>, Port<(Exec, T)>), } fn json_encode<'a, T:Encodable<json::Encoder<'a>>>(t: &T) -> ~str { @@ -427,11 +426,11 @@ impl<'a> Prep<'a> { _ => { debug!("Cache miss!"); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); let blk = bo.take_unwrap(); // XXX: What happens if the task fails? - do task::spawn { + do spawn { let mut exe = Exec { discovered_inputs: WorkMap::new(), discovered_outputs: WorkMap::new(), @@ -453,7 +452,7 @@ impl<'a, T:Send + pub fn from_value(elt: T) -> Work<'a, T> { WorkValue(elt) } - pub fn from_task(prep: &'a Prep<'a>, port: PortOne<(Exec, T)>) + pub fn from_task(prep: &'a Prep<'a>, port: Port<(Exec, T)>) -> Work<'a, T> { WorkFromTask(prep, port) } |
