diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-02-02 03:10:12 -0800 | 
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-02-21 17:36:54 -0800 | 
| commit | dab6a852303f68c2ed6c17abaca1d0728d9cf618 (patch) | |
| tree | 6f22544f3bd824afb418a609182a9ac5f5908647 | |
| parent | ab784b7de522c358e682e00c3a01d44d32c56943 (diff) | |
| download | rust-dab6a852303f68c2ed6c17abaca1d0728d9cf618.tar.gz rust-dab6a852303f68c2ed6c17abaca1d0728d9cf618.zip | |
core: Extract comm from pipes. #4742
89 files changed, 683 insertions, 611 deletions
| diff --git a/doc/tutorial-tasks.md b/doc/tutorial-tasks.md index a3d0ecaa4ba..c0f9a376270 100644 --- a/doc/tutorial-tasks.md +++ b/doc/tutorial-tasks.md @@ -157,7 +157,7 @@ concurrently: ~~~~ use task::spawn; -use pipes::{stream, Port, Chan}; +use comm::{stream, Port, Chan}; let (port, chan): (Port<int>, Chan<int>) = stream(); @@ -178,7 +178,7 @@ stream for sending and receiving integers (the left-hand side of the `let`, a tuple into its component parts). ~~~~ -# use pipes::{stream, Chan, Port}; +# use comm::{stream, Chan, Port}; let (port, chan): (Port<int>, Chan<int>) = stream(); ~~~~ @@ -189,7 +189,7 @@ spawns the child task. ~~~~ # use task::{spawn}; # use task::spawn; -# use pipes::{stream, Port, Chan}; +# use comm::{stream, Port, Chan}; # fn some_expensive_computation() -> int { 42 } # let (port, chan) = stream(); do spawn || { @@ -209,7 +209,7 @@ computation, then waits for the child's result to arrive on the port: ~~~~ -# use pipes::{stream, Port, Chan}; +# use comm::{stream, Port, Chan}; # fn some_other_expensive_computation() {} # let (port, chan) = stream::<int>(); # chan.send(0); @@ -225,7 +225,7 @@ following program is ill-typed: ~~~ {.xfail-test} # use task::{spawn}; -# use pipes::{stream, Port, Chan}; +# use comm::{stream, Port, Chan}; # fn some_expensive_computation() -> int { 42 } let (port, chan) = stream(); @@ -245,7 +245,7 @@ Instead we can use a `SharedChan`, a type that allows a single ~~~ # use task::spawn; -use pipes::{stream, SharedChan}; +use comm::{stream, SharedChan}; let (port, chan) = stream(); let chan = SharedChan(chan); @@ -278,7 +278,7 @@ might look like the example below. ~~~ # use task::spawn; -# use pipes::{stream, Port, Chan}; +# use comm::{stream, Port, Chan}; // Create a vector of ports, one for each child task let ports = do vec::from_fn(3) |init_val| { @@ -393,7 +393,7 @@ internally, with additional logic to wait for the child task to finish before returning. Hence: ~~~ -# use pipes::{stream, Chan, Port}; +# use comm::{stream, Chan, Port}; # use task::{spawn, try}; # fn sleep_forever() { loop { task::yield() } } # do task::try { @@ -468,7 +468,7 @@ Here is the function that implements the child task: ~~~~ # use std::comm::DuplexStream; -# use pipes::{Port, Chan}; +# use comm::{Port, Chan}; fn stringifier(channel: &DuplexStream<~str, uint>) { let mut value: uint; loop { @@ -491,7 +491,7 @@ Here is the code for the parent task: ~~~~ # use std::comm::DuplexStream; -# use pipes::{Port, Chan}; +# use comm::{Port, Chan}; # use task::spawn; # fn stringifier(channel: &DuplexStream<~str, uint>) { # let mut value: uint; diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index 432258b26a6..6c8bd7ea442 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -76,7 +76,7 @@ pub fn run(lib_path: ~str, writeclose(pipe_in.out, input); - let p = pipes::PortSet(); + let p = comm::PortSet(); let ch = p.chan(); do task::spawn_sched(task::SingleThreaded) || { let errput = readclose(pipe_err.in); diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs new file mode 100644 index 00000000000..7939644e51c --- /dev/null +++ b/src/libcore/comm.rs @@ -0,0 +1,410 @@ +// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// Transitional -- needs snapshot +#[allow(structural_records)]; + +use either::{Either, Left, Right}; +use kinds::Owned; +use option; +use option::{Option, Some, None, unwrap}; +use private; +use vec; + +use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; + +// NOTE Making this public exposes some plumbing from pipes. Needs +// some refactoring +pub use pipes::Selectable; + +/// A trait for things that can send multiple messages. +pub trait GenericChan<T> { + /// Sends a message. + fn send(x: T); +} + +/// Things that can send multiple messages and can detect when the receiver +/// is closed +pub trait GenericSmartChan<T> { + /// Sends a message, or report if the receiver has closed the connection. + fn try_send(x: T) -> bool; +} + +/// A trait for things that can receive multiple messages. +pub trait GenericPort<T> { + /// Receives a message, or fails if the connection closes. + fn recv() -> T; + + /** Receives a message, or returns `none` if + the connection is closed or closes. + */ + fn try_recv() -> Option<T>; +} + +/// Ports that can `peek` +pub trait Peekable<T> { + /// Returns true if a message is available + pure fn peek() -> bool; +} + +/// Returns the index of an endpoint that is ready to receive. +pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint { + wait_many(endpoints) +} + +/// Returns 0 or 1 depending on which endpoint is ready to receive +pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) -> + Either<(), ()> { + match wait_many([a.header(), b.header()]) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index") + } +} + +// Streams - Make pipes a little easier in general. + +proto! streamp ( + Open:send<T: Owned> { + data(T) -> Open<T> + } +) + +#[doc(hidden)] +struct Chan_<T> { + mut endp: Option<streamp::client::Open<T>> +} + +/// An endpoint that can send many messages. +pub enum Chan<T> { + Chan_(Chan_<T>) +} + +struct Port_<T> { + mut endp: Option<streamp::server::Open<T>>, +} + +/// An endpoint that can receive many messages. +pub enum Port<T> { + Port_(Port_<T>) +} + +/** Creates a `(chan, port)` pair. + +These allow sending or receiving an unlimited number of messages. + +*/ +pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) { + let (c, s) = streamp::init(); + + (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) })) +} + +impl<T: Owned> GenericChan<T> for Chan<T> { + fn send(x: T) { + let mut endp = None; + endp <-> self.endp; + self.endp = Some( + streamp::client::data(unwrap(endp), x)) + } +} + +impl<T: Owned> GenericSmartChan<T> for Chan<T> { + + fn try_send(x: T) -> bool { + let mut endp = None; + endp <-> self.endp; + match streamp::client::try_data(unwrap(endp), x) { + Some(next) => { + self.endp = Some(next); + true + } + None => false + } + } +} + +impl<T: Owned> GenericPort<T> for Port<T> { + fn recv() -> T { + let mut endp = None; + endp <-> self.endp; + let streamp::data(x, endp) = recv(unwrap(endp)); + self.endp = Some(endp); + x + } + + fn try_recv() -> Option<T> { + let mut endp = None; + endp <-> self.endp; + match try_recv(unwrap(endp)) { + Some(streamp::data(x, endp)) => { + self.endp = Some(endp); + Some(x) + } + None => None + } + } +} + +impl<T: Owned> Peekable<T> for Port<T> { + pure fn peek() -> bool { + unsafe { + let mut endp = None; + endp <-> self.endp; + let peek = match &endp { + &Some(ref endp) => peek(endp), + &None => fail!(~"peeking empty stream") + }; + self.endp <-> endp; + peek + } + } +} + +impl<T: Owned> Selectable for Port<T> { + pure fn header() -> *PacketHeader { + unsafe { + match self.endp { + Some(ref endp) => endp.header(), + None => fail!(~"peeking empty stream") + } + } + } +} + +/// Treat many ports as one. +pub struct PortSet<T> { + mut ports: ~[Port<T>], +} + +pub fn PortSet<T: Owned>() -> PortSet<T>{ + PortSet { + ports: ~[] + } +} + +impl<T: Owned> PortSet<T> { + + fn add(port: Port<T>) { + self.ports.push(port) + } + + fn chan() -> Chan<T> { + let (po, ch) = stream(); + self.add(po); + ch + } +} + +impl<T: Owned> GenericPort<T> for PortSet<T> { + + fn try_recv() -> Option<T> { + let mut result = None; + // we have to swap the ports array so we aren't borrowing + // aliasable mutable memory. + let mut ports = ~[]; + ports <-> self.ports; + while result.is_none() && ports.len() > 0 { + let i = wait_many(ports); + match ports[i].try_recv() { + Some(m) => { + result = Some(m); + } + None => { + // Remove this port. + let _ = ports.swap_remove(i); + } + } + } + ports <-> self.ports; + result + } + + fn recv() -> T { + self.try_recv().expect("port_set: endpoints closed") + } + +} + +impl<T: Owned> Peekable<T> for PortSet<T> { + pure fn peek() -> bool { + // It'd be nice to use self.port.each, but that version isn't + // pure. + for vec::each(self.ports) |p| { + if p.peek() { return true } + } + false + } +} + +/// A channel that can be shared between many senders. +pub type SharedChan<T> = private::Exclusive<Chan<T>>; + +impl<T: Owned> GenericChan<T> for SharedChan<T> { + fn send(x: T) { + let mut xx = Some(x); + do self.with_imm |chan| { + let mut x = None; + x <-> xx; + chan.send(option::unwrap(x)) + } + } +} + +impl<T: Owned> GenericSmartChan<T> for SharedChan<T> { + fn try_send(x: T) -> bool { + let mut xx = Some(x); + do self.with_imm |chan| { + let mut x = None; + x <-> xx; + chan.try_send(option::unwrap(x)) + } + } +} + +/// Converts a `chan` into a `shared_chan`. +pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> { + private::exclusive(c) +} + +/// Receive a message from one of two endpoints. +pub trait Select2<T: Owned, U: Owned> { + /// Receive a message or return `None` if a connection closes. + fn try_select() -> Either<Option<T>, Option<U>>; + /// Receive a message or fail if a connection closes. + fn select() -> Either<T, U>; +} + +impl<T: Owned, U: Owned, + Left: Selectable + GenericPort<T>, + Right: Selectable + GenericPort<U>> + Select2<T, U> for (Left, Right) { + + fn select() -> Either<T, U> { + match self { + (ref lp, ref rp) => match select2i(lp, rp) { + Left(()) => Left (lp.recv()), + Right(()) => Right(rp.recv()) + } + } + } + + fn try_select() -> Either<Option<T>, Option<U>> { + match self { + (ref lp, ref rp) => match select2i(lp, rp) { + Left(()) => Left (lp.try_recv()), + Right(()) => Right(rp.try_recv()) + } + } + } +} + +proto! oneshot ( + Oneshot:send<T:Owned> { + send(T) -> ! + } +) + +/// The send end of a oneshot pipe. +pub type ChanOne<T> = oneshot::client::Oneshot<T>; +/// The receive end of a oneshot pipe. +pub type PortOne<T> = oneshot::server::Oneshot<T>; + +/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. +pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { + let (chan, port) = oneshot::init(); + (port, chan) +} + +impl<T: Owned> PortOne<T> { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option<T> { try_recv_one(self) } +} + +impl<T: Owned> ChanOne<T> { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } +} + +/** + * Receive a message from a oneshot pipe, failing if the connection was + * closed. + */ +pub fn recv_one<T: Owned>(port: PortOne<T>) -> T { + let oneshot::send(message) = recv(port); + message +} + +/// Receive a message from a oneshot pipe unless the connection was closed. +pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> { + let message = try_recv(port); + + if message.is_none() { None } + else { + let oneshot::send(message) = option::unwrap(message); + Some(message) + } +} + +/// Send a message on a oneshot pipe, failing if the connection was closed. +pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) { + oneshot::client::send(chan, data); +} + +/** + * Send a message on a oneshot pipe, or return false if the connection was + * closed. + */ +pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) + -> bool { + oneshot::client::try_send(chan, data).is_some() +} + +#[cfg(test)] +pub mod test { + use either::{Either, Left, Right}; + use super::{Chan, Port, oneshot, recv_one, stream}; + + #[test] + pub fn test_select2() { + let (p1, c1) = stream(); + let (p2, c2) = stream(); + + c1.send(~"abc"); + + match (p1, p2).select() { + Right(_) => fail!(), + _ => () + } + + c2.send(123); + } + + #[test] + pub fn test_oneshot() { + let (c, p) = oneshot::init(); + + oneshot::client::send(c, ()); + + recv_one(p) + } + + #[test] + fn test_peek_terminated() { + let (port, chan): (Port<int>, Chan<int>) = stream(); + + { + // Destroy the channel + let _chan = chan; + } + + assert !port.peek(); + } +} diff --git a/src/libcore/core.rc b/src/libcore/core.rc index eab66bc0e37..01669557389 100644 --- a/src/libcore/core.rc +++ b/src/libcore/core.rc @@ -148,6 +148,7 @@ pub mod hashmap; #[path = "task/mod.rs"] pub mod task; +pub mod comm; pub mod pipes; @@ -255,6 +256,7 @@ pub mod core { pub use option; pub use kinds; pub use sys; + pub use pipes; } diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 9d4cadff08a..94c0a567f4c 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -142,7 +142,7 @@ pub struct Buffer<T> { data: T, } -struct PacketHeader { +pub struct PacketHeader { mut state: State, mut blocked_task: *rust_task, @@ -151,7 +151,7 @@ struct PacketHeader { mut buffer: *libc::c_void, } -fn PacketHeader() -> PacketHeader { +pub fn PacketHeader() -> PacketHeader { PacketHeader { state: Empty, blocked_task: ptr::null(), @@ -159,7 +159,7 @@ fn PacketHeader() -> PacketHeader { } } -impl PacketHeader { +pub impl PacketHeader { // Returns the old state. unsafe fn mark_blocked(this: *rust_task) -> State { rustrt::rust_task_ref(this); @@ -551,12 +551,6 @@ pub pure fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool { } } -impl<T:Owned,Tb:Owned> Peekable<T> for RecvPacketBuffered<T, Tb> { - pure fn peek() -> bool { - peek(&self) - } -} - #[doc(hidden)] fn sender_terminate<T:Owned>(p: *Packet<T>) { let p = unsafe { &*p }; @@ -622,7 +616,7 @@ that vector. The index points to an endpoint that has either been closed by the sender or has a message waiting to be received. */ -fn wait_many<T:Selectable>(pkts: &[T]) -> uint { +pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint { let this = unsafe { rustrt::rust_get_task() }; unsafe { @@ -720,7 +714,7 @@ pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>( } #[doc(hidden)] -trait Selectable { +pub trait Selectable { pure fn header() -> *PacketHeader; } @@ -957,335 +951,6 @@ pub fn spawn_service_recv<T:Owned,Tb:Owned>( client } -// Streams - Make pipes a little easier in general. - -proto! streamp ( - Open:send<T:Owned> { - data(T) -> Open<T> - } -) - -/// A trait for things that can send multiple messages. -pub trait GenericChan<T> { - /// Sends a message. - fn send(x: T); -} - -/// Things that can send multiple messages and can detect when the receiver -/// is closed -pub trait GenericSmartChan<T> { - /// Sends a message, or report if the receiver has closed the connection. - fn try_send(x: T) -> bool; -} - -/// A trait for things that can receive multiple messages. -pub trait GenericPort<T> { - /// Receives a message, or fails if the connection closes. - fn recv() -> T; - - /** Receives a message, or returns `none` if - the connection is closed or closes. - */ - fn try_recv() -> Option<T>; -} - -/// Ports that can `peek` -pub trait Peekable<T> { - /// Returns true if a message is available - pure fn peek() -> bool; -} - -#[doc(hidden)] -struct Chan_<T> { - mut endp: Option<streamp::client::Open<T>> -} - -/// An endpoint that can send many messages. -pub enum Chan<T> { - Chan_(Chan_<T>) -} - -#[doc(hidden)] -struct Port_<T> { - mut endp: Option<streamp::server::Open<T>>, -} - -/// An endpoint that can receive many messages. -pub enum Port<T> { - Port_(Port_<T>) -} - -/** Creates a `(chan, port)` pair. - -These allow sending or receiving an unlimited number of messages. - -*/ -pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) { - let (c, s) = streamp::init(); - - (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) })) -} - -impl<T:Owned> GenericChan<T> for Chan<T> { - fn send(x: T) { - let mut endp = None; - endp <-> self.endp; - self.endp = Some( - streamp::client::data(unwrap(endp), x)) - } -} - -impl<T:Owned> GenericSmartChan<T> for Chan<T> { - - fn try_send(x: T) -> bool { - let mut endp = None; - endp <-> self.endp; - match streamp::client::try_data(unwrap(endp), x) { - Some(next) => { - self.endp = Some(next); - true - } - None => false - } - } -} - -impl<T:Owned> GenericPort<T> for Port<T> { - fn recv() -> T { - let mut endp = None; - endp <-> self.endp; - let streamp::data(x, endp) = pipes::recv(unwrap(endp)); - self.endp = Some(endp); - x - } - - fn try_recv() -> Option<T> { - let mut endp = None; - endp <-> self.endp; - match pipes::try_recv(unwrap(endp)) { - Some(streamp::data(x, endp)) => { - self.endp = Some(endp); - Some(x) - } - None => None - } - } -} - -impl<T:Owned> Peekable<T> for Port<T> { - pure fn peek() -> bool { - unsafe { - let mut endp = None; - endp <-> self.endp; - let peek = match &endp { - &Some(ref endp) => pipes::peek(endp), - &None => fail!(~"peeking empty stream") - }; - self.endp <-> endp; - peek - } - } -} - -impl<T:Owned> Selectable for Port<T> { - pure fn header() -> *PacketHeader { - unsafe { - match self.endp { - Some(ref endp) => endp.header(), - None => fail!(~"peeking empty stream") - } - } - } -} - -/// Treat many ports as one. -pub struct PortSet<T> { - mut ports: ~[pipes::Port<T>], -} - -pub fn PortSet<T:Owned>() -> PortSet<T>{ - PortSet { - ports: ~[] - } -} - -impl<T:Owned> PortSet<T> { - - fn add(port: pipes::Port<T>) { - self.ports.push(port) - } - - fn chan() -> Chan<T> { - let (po, ch) = stream(); - self.add(po); - ch - } -} - -impl<T:Owned> GenericPort<T> for PortSet<T> { - - fn try_recv() -> Option<T> { - let mut result = None; - // we have to swap the ports array so we aren't borrowing - // aliasable mutable memory. - let mut ports = ~[]; - ports <-> self.ports; - while result.is_none() && ports.len() > 0 { - let i = wait_many(ports); - match ports[i].try_recv() { - Some(m) => { - result = Some(m); - } - None => { - // Remove this port. - let _ = ports.swap_remove(i); - } - } - } - ports <-> self.ports; - result - } - - fn recv() -> T { - self.try_recv().expect("port_set: endpoints closed") - } - -} - -impl<T:Owned> Peekable<T> for PortSet<T> { - pure fn peek() -> bool { - // It'd be nice to use self.port.each, but that version isn't - // pure. - for vec::each(self.ports) |p| { - if p.peek() { return true } - } - false - } -} - -/// A channel that can be shared between many senders. -pub type SharedChan<T> = private::Exclusive<Chan<T>>; - -impl<T:Owned> GenericChan<T> for SharedChan<T> { - fn send(x: T) { - let mut xx = Some(x); - do self.with_imm |chan| { - let mut x = None; - x <-> xx; - chan.send(option::unwrap(x)) - } - } -} - -impl<T:Owned> GenericSmartChan<T> for SharedChan<T> { - fn try_send(x: T) -> bool { - let mut xx = Some(x); - do self.with_imm |chan| { - let mut x = None; - x <-> xx; - chan.try_send(option::unwrap(x)) - } - } -} - -/// Converts a `chan` into a `shared_chan`. -pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> { - private::exclusive(c) -} - -/// Receive a message from one of two endpoints. -pub trait Select2<T:Owned,U:Owned> { - /// Receive a message or return `None` if a connection closes. - fn try_select() -> Either<Option<T>, Option<U>>; - /// Receive a message or fail if a connection closes. - fn select() -> Either<T, U>; -} - -impl<T: Owned, - U: Owned, - Left: Selectable + GenericPort<T>, - Right: Selectable + GenericPort<U>> - Select2<T,U> for (Left, Right) { - fn select() -> Either<T, U> { - match self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.recv()), - Right(()) => Right(rp.recv()) - } - } - } - - fn try_select() -> Either<Option<T>, Option<U>> { - match self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.try_recv()), - Right(()) => Right(rp.try_recv()) - } - } - } -} - -proto! oneshot ( - Oneshot:send<T:Owned> { - send(T) -> ! - } -) - -/// The send end of a oneshot pipe. -pub type ChanOne<T> = oneshot::client::Oneshot<T>; -/// The receive end of a oneshot pipe. -pub type PortOne<T> = oneshot::server::Oneshot<T>; - -/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. -pub fn oneshot<T:Owned>() -> (PortOne<T>, ChanOne<T>) { - let (chan, port) = oneshot::init(); - (port, chan) -} - -impl<T:Owned> PortOne<T> { - fn recv(self) -> T { recv_one(self) } - fn try_recv(self) -> Option<T> { try_recv_one(self) } -} - -impl<T:Owned> ChanOne<T> { - fn send(self, data: T) { send_one(self, data) } - fn try_send(self, data: T) -> bool { try_send_one(self, data) } -} - -/** - * Receive a message from a oneshot pipe, failing if the connection was - * closed. - */ -pub fn recv_one<T:Owned>(port: PortOne<T>) -> T { - let oneshot::send(message) = recv(port); - message -} - -/// Receive a message from a oneshot pipe unless the connection was closed. -pub fn try_recv_one<T:Owned> (port: PortOne<T>) -> Option<T> { - let message = try_recv(port); - - if message.is_none() { None } - else { - let oneshot::send(message) = option::unwrap(message); - Some(message) - } -} - -/// Send a message on a oneshot pipe, failing if the connection was closed. -pub fn send_one<T:Owned>(chan: ChanOne<T>, data: T) { - oneshot::client::send(chan, data); -} - -/** - * Send a message on a oneshot pipe, or return false if the connection was - * closed. - */ -pub fn try_send_one<T:Owned>(chan: ChanOne<T>, data: T) - -> bool { - oneshot::client::try_send(chan, data).is_some() -} - pub mod rt { use option::{None, Option, Some}; @@ -1298,13 +963,13 @@ pub mod rt { #[cfg(test)] pub mod test { use either::{Either, Left, Right}; - use pipes::{Chan, Port, oneshot, recv_one, stream}; - use pipes; + use comm::{Chan, Port, oneshot, recv_one, stream, Select2, + GenericPort, GenericChan, Peekable}; #[test] pub fn test_select2() { - let (p1, c1) = pipes::stream(); - let (p2, c2) = pipes::stream(); + let (p1, c1) = stream(); + let (p2, c2) = stream(); c1.send(~"abc"); diff --git a/src/libcore/prelude.rs b/src/libcore/prelude.rs index 1b2bfef5ecd..d0a16f7875b 100644 --- a/src/libcore/prelude.rs +++ b/src/libcore/prelude.rs @@ -68,7 +68,7 @@ pub use ops; pub use option; pub use os; pub use path; -pub use pipes; +pub use comm; pub use private; pub use ptr; pub use rand; diff --git a/src/libcore/private.rs b/src/libcore/private.rs index a2656a9f73b..2738e5564fc 100644 --- a/src/libcore/private.rs +++ b/src/libcore/private.rs @@ -14,7 +14,7 @@ use cast; use iter; use libc; use option; -use pipes::{GenericChan, GenericPort}; +use comm::{GenericChan, GenericPort}; use prelude::*; use ptr; use result; @@ -59,7 +59,7 @@ The executing thread has no access to a task pointer and will be using a normal large stack. */ pub unsafe fn run_in_bare_thread(f: ~fn()) { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); // FIXME #4525: Unfortunate that this creates an extra scheduler but it's // necessary since rust_raw_thread_join_delete is blocking do task::spawn_sched(task::SingleThreaded) { @@ -110,7 +110,7 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool { // An unwrapper uses this protocol to communicate with the "other" task that // drops the last refcount on an arc. Unfortunately this can't be a proper // pipe protocol because the unwrapper has to access both stages at once. -type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>; +type UnwrapProto = ~mut Option<(comm::ChanOne<()>, comm::PortOne<bool>)>; struct ArcData<T> { mut count: libc::intptr_t, @@ -143,9 +143,9 @@ struct ArcDestruct<T> { cast::reinterpret_cast(&data.unwrapper); let (message, response) = option::swap_unwrap(p); // Send 'ready' and wait for a response. - pipes::send_one(message, ()); + comm::send_one(message, ()); // Unkillable wait. Message guaranteed to come. - if pipes::recv_one(response) { + if comm::recv_one(response) { // Other task got the data. cast::forget(data); } else { @@ -172,7 +172,7 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>) -> T { struct DeathThroes<T> { mut ptr: Option<~ArcData<T>>, - mut response: Option<pipes::ChanOne<bool>>, + mut response: Option<comm::ChanOne<bool>>, drop { unsafe { let response = option::swap_unwrap(&mut self.response); @@ -180,13 +180,13 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>) // tried to wake us whether they should hand-off the data to // us. if task::failing() { - pipes::send_one(response, false); + comm::send_one(response, false); // Either this swap_unwrap or the one below (at "Got // here") ought to run. cast::forget(option::swap_unwrap(&mut self.ptr)); } else { assert self.ptr.is_none(); - pipes::send_one(response, true); + comm::send_one(response, true); } } } @@ -194,8 +194,8 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>) do task::unkillable { let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data); - let (p1,c1) = pipes::oneshot(); // () - let (p2,c2) = pipes::oneshot(); // bool + let (p1,c1) = comm::oneshot(); // () + let (p2,c2) = comm::oneshot(); // bool let server: UnwrapProto = ~mut Some((c1,p2)); let serverp: int = cast::transmute(server); // Try to put our server end in the unwrapper slot. @@ -218,7 +218,7 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>) response: Some(c2) }; let mut p1 = Some(p1); // argh do task::rekillable { - pipes::recv_one(option::swap_unwrap(&mut p1)); + comm::recv_one(option::swap_unwrap(&mut p1)); } // Got here. Back in the 'unkillable' without getting killed. // Recover ownership of ptr, then take the data out. @@ -410,7 +410,7 @@ pub mod tests { use core::option::{None, Some}; use option; - use pipes; + use comm; use private::{exclusive, unwrap_exclusive}; use result; use task; @@ -427,7 +427,7 @@ pub mod tests { for uint::range(0, num_tasks) |_i| { let total = total.clone(); - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); futures.push(port); do task::spawn || { diff --git a/src/libcore/private/weak_task.rs b/src/libcore/private/weak_task.rs index f285f811f15..f3df8ce72f1 100644 --- a/src/libcore/private/weak_task.rs +++ b/src/libcore/private/weak_task.rs @@ -22,8 +22,8 @@ use option::{Some, None, swap_unwrap}; use private::at_exit::at_exit; use private::global::global_data_clone_create; use private::finally::Finally; -use pipes::{Port, Chan, SharedChan, GenericChan, GenericPort, - GenericSmartChan, stream}; +use comm::{Port, Chan, SharedChan, GenericChan, + GenericPort, GenericSmartChan, stream}; use task::{Task, task, spawn}; use task::rt::{task_id, get_task_id}; use hashmap::linear::LinearMap; @@ -186,7 +186,7 @@ fn test_wait_for_signal_many() { #[test] fn test_select_stream_and_oneshot() { - use pipes::select2i; + use comm::select2i; use either::{Left, Right}; let (port, chan) = stream(); diff --git a/src/libcore/run.rs b/src/libcore/run.rs index 5103025d120..4e2337b8331 100644 --- a/src/libcore/run.rs +++ b/src/libcore/run.rs @@ -14,7 +14,7 @@ use io; use io::ReaderUtil; use libc; use libc::{pid_t, c_void, c_int}; -use pipes::{stream, SharedChan, GenericChan, GenericPort}; +use comm::{stream, SharedChan, GenericChan, GenericPort}; use option::{Some, None}; use os; use prelude::*; diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index 54dfa7459a1..336e686193b 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -40,7 +40,7 @@ use iter; use libc; use option; use result::Result; -use pipes::{stream, Chan, GenericChan, GenericPort, Port, SharedChan}; +use comm::{stream, Chan, GenericChan, GenericPort, Port, SharedChan}; use pipes; use prelude::*; use ptr; @@ -1109,7 +1109,7 @@ fn test_unkillable() { #[ignore(cfg(windows))] #[should_fail] fn test_unkillable_nested() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); // We want to do this after failing do spawn_unlinked || { @@ -1175,7 +1175,7 @@ fn test_child_doesnt_ref_parent() { #[test] fn test_sched_thread_per_core() { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); do spawn_sched(ThreadPerCore) || { unsafe { @@ -1191,7 +1191,7 @@ fn test_sched_thread_per_core() { #[test] fn test_spawn_thread_on_demand() { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); do spawn_sched(ManualThreads(2)) || { unsafe { @@ -1200,7 +1200,7 @@ fn test_spawn_thread_on_demand() { let running_threads = rt::rust_sched_current_nonlazy_threads(); assert(running_threads as int == 1); - let (port2, chan2) = pipes::stream(); + let (port2, chan2) = comm::stream(); do spawn_sched(CurrentScheduler) || { chan2.send(()); diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs index d72cacc2c4b..e77af820079 100644 --- a/src/libcore/task/spawn.rs +++ b/src/libcore/task/spawn.rs @@ -75,7 +75,7 @@ use cast; use container::Map; use option; -use pipes::{Chan, GenericChan, GenericPort, Port, stream}; +use comm::{Chan, GenericChan, GenericPort, Port, stream}; use pipes; use prelude::*; use private; @@ -702,7 +702,7 @@ fn test_spawn_raw_unsupervise() { #[test] #[ignore(cfg(windows))] fn test_spawn_raw_notify_success() { - let (notify_po, notify_ch) = pipes::stream(); + let (notify_po, notify_ch) = comm::stream(); let opts = task::TaskOpts { notify_chan: Some(notify_ch), @@ -717,7 +717,7 @@ fn test_spawn_raw_notify_success() { #[ignore(cfg(windows))] fn test_spawn_raw_notify_failure() { // New bindings for these - let (notify_po, notify_ch) = pipes::stream(); + let (notify_po, notify_ch) = comm::stream(); let opts = task::TaskOpts { linked: false, diff --git a/src/librustc/rustc.rc b/src/librustc/rustc.rc index 93bc0dc0a2c..01758a1845d 100644 --- a/src/librustc/rustc.rc +++ b/src/librustc/rustc.rc @@ -314,7 +314,7 @@ fails without recording a fatal error then we've encountered a compiler bug and need to present an error. */ pub fn monitor(+f: fn~(diagnostic::Emitter)) { - use core::pipes::*; + use core::comm::*; use std::cell::Cell; let (p, ch) = stream(); let ch = SharedChan(ch); diff --git a/src/librustdoc/astsrv.rs b/src/librustdoc/astsrv.rs index f34a7ffbbdb..fff2e189eb8 100644 --- a/src/librustdoc/astsrv.rs +++ b/src/librustdoc/astsrv.rs @@ -23,7 +23,7 @@ use parse; use util; use std::cell::Cell; -use core::pipes::{stream, Chan, SharedChan, Port}; +use core::comm::{stream, Chan, SharedChan, Port}; use core::vec; use core::ops::Drop; use rustc::back::link; diff --git a/src/librustdoc/markdown_writer.rs b/src/librustdoc/markdown_writer.rs index a6cc5170796..45a8aa9fd29 100644 --- a/src/librustdoc/markdown_writer.rs +++ b/src/librustdoc/markdown_writer.rs @@ -20,12 +20,12 @@ use core::io::ReaderUtil; use core::io; use core::libc; use core::os; -use core::pipes; +use core::comm; use core::result; use core::run; use core::str; use core::task; -use core::pipes::*; +use core::comm::*; use std::future; use syntax; @@ -128,12 +128,12 @@ fn pandoc_writer( os::close(pipe_err.out); os::close(pipe_in.out); - let (stdout_po, stdout_ch) = pipes::stream(); + let (stdout_po, stdout_ch) = comm::stream(); do task::spawn_sched(task::SingleThreaded) || { stdout_ch.send(readclose(pipe_out.in)); } - let (stderr_po, stderr_ch) = pipes::stream(); + let (stderr_po, stderr_ch) = comm::stream(); do task::spawn_sched(task::SingleThreaded) || { stderr_ch.send(readclose(pipe_err.in)); } @@ -296,7 +296,7 @@ pub fn future_writer_factory( let (markdown_po, markdown_ch) = stream(); let markdown_ch = SharedChan(markdown_ch); let writer_factory = fn~(page: doc::Page) -> Writer { - let (writer_po, writer_ch) = pipes::stream(); + let (writer_po, writer_ch) = comm::stream(); let markdown_ch = markdown_ch.clone(); do task::spawn || { let (writer, future) = future_writer(); @@ -311,7 +311,7 @@ pub fn future_writer_factory( } fn future_writer() -> (Writer, future::Future<~str>) { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); let writer = fn~(instr: WriteInstr) { chan.send(copy instr); }; diff --git a/src/librustdoc/page_pass.rs b/src/librustdoc/page_pass.rs index 4971806c7ed..2a2c3888647 100644 --- a/src/librustdoc/page_pass.rs +++ b/src/librustdoc/page_pass.rs @@ -30,7 +30,7 @@ use util; use core::option; use core::vec; -use core::pipes::*; +use core::comm::*; use syntax::ast; pub fn mk_pass(output_style: config::OutputStyle) -> Pass { diff --git a/src/libstd/arc.rs b/src/libstd/arc.rs index 50f40559807..61b5ffd845f 100644 --- a/src/libstd/arc.rs +++ b/src/libstd/arc.rs @@ -507,10 +507,10 @@ mod tests { let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let arc_v = arc::ARC(v); - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); do task::spawn() || { - let p = pipes::PortSet(); + let p = comm::PortSet(); c.send(p.chan()); let arc_v = p.recv(); @@ -531,18 +531,18 @@ mod tests { pub fn test_mutex_arc_condvar() { let arc = ~MutexARC(false); let arc2 = ~arc.clone(); - let (p,c) = pipes::oneshot(); + let (p,c) = comm::oneshot(); let (c,p) = (~mut Some(c), ~mut Some(p)); do task::spawn || { // wait until parent gets in - pipes::recv_one(option::swap_unwrap(p)); + comm::recv_one(option::swap_unwrap(p)); do arc2.access_cond |state, cond| { *state = true; cond.signal(); } } do arc.access_cond |state, cond| { - pipes::send_one(option::swap_unwrap(c), ()); + comm::send_one(option::swap_unwrap(c), ()); assert !*state; while !*state { cond.wait(); @@ -553,7 +553,7 @@ mod tests { pub fn test_arc_condvar_poison() { let arc = ~MutexARC(1); let arc2 = ~arc.clone(); - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); do task::spawn_unlinked || { let _ = p.recv(); @@ -587,7 +587,7 @@ mod tests { pub fn test_mutex_arc_unwrap_poison() { let arc = MutexARC(1); let arc2 = ~(&arc).clone(); - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); do task::spawn || { do arc2.access |one| { c.send(()); @@ -685,7 +685,7 @@ mod tests { pub fn test_rw_arc() { let arc = ~RWARC(0); let arc2 = ~arc.clone(); - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); do task::spawn || { do arc2.write |num| { @@ -731,7 +731,7 @@ mod tests { // Reader tasks let mut reader_convos = ~[]; for 10.times { - let ((rp1,rc1),(rp2,rc2)) = (pipes::stream(),pipes::stream()); + let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream()); reader_convos.push((rc1, rp2)); let arcn = ~arc.clone(); do task::spawn || { @@ -745,7 +745,7 @@ mod tests { // Writer task let arc2 = ~arc.clone(); - let ((wp1,wc1),(wp2,wc2)) = (pipes::stream(),pipes::stream()); + let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream()); do task::spawn || { wp1.recv(); do arc2.write_cond |state, cond| { diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index 9478a392796..02875739eba 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -14,8 +14,8 @@ Higher level communication abstractions. */ -use core::pipes::{GenericChan, GenericSmartChan, GenericPort}; -use core::pipes::{Chan, Port, Selectable, Peekable}; +use core::comm::{GenericChan, GenericSmartChan, GenericPort}; +use core::comm::{Chan, Port, Selectable, Peekable}; use core::pipes; use core::prelude::*; @@ -63,8 +63,8 @@ impl<T:Owned,U:Owned> Selectable for DuplexStream<T, U> { pub fn DuplexStream<T:Owned,U:Owned>() -> (DuplexStream<T, U>, DuplexStream<U, T>) { - let (p1, c2) = pipes::stream(); - let (p2, c1) = pipes::stream(); + let (p1, c2) = comm::stream(); + let (p2, c1) = comm::stream(); (DuplexStream { chan: c1, port: p1 diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index 80f93323a8e..13c0bbe1a67 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -49,8 +49,8 @@ block the scheduler thread, so will their pipes. // The basic send/recv interface FlatChan and PortChan will implement use core::io; -use core::pipes::GenericChan; -use core::pipes::GenericPort; +use core::comm::GenericChan; +use core::comm::GenericPort; use core::pipes; use core::prelude::*; use core::sys::size_of; @@ -95,8 +95,8 @@ pub mod serial { use flatpipes::{FlatPort, FlatChan}; use core::io::{Reader, Writer}; - use core::pipes::{Port, Chan}; - use core::pipes; + use core::comm::{Port, Chan}; + use core::comm; pub type ReaderPort<T, R> = FlatPort< T, DeserializingUnflattener<DefaultDecoder, T>, @@ -154,7 +154,7 @@ pub mod serial { pub fn pipe_stream<T: Encodable<DefaultEncoder> + Decodable<DefaultDecoder>>( ) -> (PipePort<T>, PipeChan<T>) { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); return (pipe_port(port), pipe_chan(chan)); } } @@ -177,8 +177,8 @@ pub mod pod { use flatpipes::{FlatPort, FlatChan}; use core::io::{Reader, Writer}; - use core::pipes::{Port, Chan}; - use core::pipes; + use core::comm::{Port, Chan}; + use core::comm; use core::prelude::*; pub type ReaderPort<T, R> = @@ -222,7 +222,7 @@ pub mod pod { /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes pub fn pipe_stream<T:Copy + Owned>() -> (PipePort<T>, PipeChan<T>) { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); return (pipe_port(port), pipe_chan(chan)); } @@ -507,7 +507,7 @@ pub mod bytepipes { use flatpipes::{ByteChan, BytePort}; use core::io::{Writer, Reader, ReaderUtil}; - use core::pipes::{Port, Chan}; + use core::comm::{Port, Chan}; use core::pipes; use core::prelude::*; @@ -564,12 +564,12 @@ pub mod bytepipes { } pub struct PipeBytePort { - port: pipes::Port<~[u8]>, + port: comm::Port<~[u8]>, mut buf: ~[u8] } pub struct PipeByteChan { - chan: pipes::Chan<~[u8]> + chan: comm::Chan<~[u8]> } pub impl BytePort for PipeBytePort { @@ -777,12 +777,12 @@ mod test { use uv; // Indicate to the client task that the server is listening - let (begin_connect_port, begin_connect_chan) = pipes::stream(); + let (begin_connect_port, begin_connect_chan) = comm::stream(); // The connection is sent from the server task to the receiver task // to handle the connection - let (accept_port, accept_chan) = pipes::stream(); + let (accept_port, accept_chan) = comm::stream(); // The main task will wait until the test is over to proceed - let (finish_port, finish_chan) = pipes::stream(); + let (finish_port, finish_chan) = comm::stream(); let addr0 = ip::v4::parse_addr("127.0.0.1"); @@ -803,7 +803,7 @@ mod test { }) |new_conn, kill_ch| { // Incoming connection. Send it to the receiver task to accept - let (res_port, res_chan) = pipes::stream(); + let (res_port, res_chan) = comm::stream(); accept_chan.send((new_conn, res_chan)); // Wait until the connection is accepted res_port.recv(); @@ -894,7 +894,7 @@ mod test { fn pipe_port_loader(bytes: ~[u8] ) -> pod::PipePort<int> { - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); if !bytes.is_empty() { chan.send(bytes); } diff --git a/src/libstd/future.rs b/src/libstd/future.rs index ff81393a914..b6b001727a4 100644 --- a/src/libstd/future.rs +++ b/src/libstd/future.rs @@ -25,7 +25,8 @@ use core::cast::copy_lifetime; use core::cast; use core::either::Either; use core::option; -use core::pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one}; +use core::comm::{oneshot, ChanOne, PortOne, send_one, recv_one}; +use core::pipes::recv; use core::prelude::*; use core::task; @@ -150,7 +151,7 @@ pub mod test { use future::*; - use core::pipes::oneshot; + use core::comm::oneshot; use core::task; #[test] diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 4a185f68e17..bc17cb0bfe9 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -12,7 +12,7 @@ use core::libc; use core::prelude::*; -use core::pipes::{stream, SharedChan}; +use core::comm::{stream, SharedChan}; use core::ptr; use core::result; use core::str; diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 563bc1c203a..8835cdfb105 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -24,7 +24,7 @@ use core::io::{Reader, ReaderUtil, Writer}; use core::io; use core::libc::size_t; use core::libc; -use core::pipes::{stream, Chan, Port, SharedChan}; +use core::comm::{stream, Chan, Port, SharedChan}; use core::prelude::*; use core::ptr; use core::result::{Result}; @@ -1441,7 +1441,7 @@ pub mod test { use uv; use core::io; - use core::pipes::{stream, Chan, Port, SharedChan}; + use core::comm::{stream, Chan, Port, SharedChan}; use core::prelude::*; use core::result; use core::str; diff --git a/src/libstd/sync.rs b/src/libstd/sync.rs index 66d17392417..016847a5bfd 100644 --- a/src/libstd/sync.rs +++ b/src/libstd/sync.rs @@ -30,16 +30,16 @@ use core::vec; // Each waiting task receives on one of these. #[doc(hidden)] -type WaitEnd = pipes::PortOne<()>; +type WaitEnd = comm::PortOne<()>; #[doc(hidden)] -type SignalEnd = pipes::ChanOne<()>; +type SignalEnd = comm::ChanOne<()>; // A doubly-ended queue of waiting tasks. #[doc(hidden)] -struct Waitqueue { head: pipes::Port<SignalEnd>, - tail: pipes::Chan<SignalEnd> } +struct Waitqueue { head: comm::Port<SignalEnd>, + tail: comm::Chan<SignalEnd> } fn new_waitqueue() -> Waitqueue { - let (block_head, block_tail) = pipes::stream(); + let (block_head, block_tail) = comm::stream(); Waitqueue { head: block_head, tail: block_tail } } @@ -50,7 +50,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool { if q.head.peek() { // Pop and send a wakeup signal. If the waiter was killed, its port // will have closed. Keep trying until we get a live task. - if pipes::try_send_one(q.head.recv(), ()) { + if comm::try_send_one(q.head.recv(), ()) { true } else { signal_waitqueue(q) @@ -64,7 +64,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool { fn broadcast_waitqueue(q: &Waitqueue) -> uint { let mut count = 0; while q.head.peek() { - if pipes::try_send_one(q.head.recv(), ()) { + if comm::try_send_one(q.head.recv(), ()) { count += 1; } } @@ -107,7 +107,7 @@ impl<Q:Owned> &Sem<Q> { state.count -= 1; if state.count < 0 { // Create waiter nobe. - let (WaitEnd, SignalEnd) = pipes::oneshot(); + let (WaitEnd, SignalEnd) = comm::oneshot(); // Tell outer scope we need to block. waiter_nobe = Some(WaitEnd); // Enqueue ourself. @@ -119,7 +119,7 @@ impl<Q:Owned> &Sem<Q> { /* for 1000.times { task::yield(); } */ // Need to wait outside the exclusive. if waiter_nobe.is_some() { - let _ = pipes::recv_one(option::unwrap(waiter_nobe)); + let _ = comm::recv_one(option::unwrap(waiter_nobe)); } } fn release() { @@ -214,7 +214,7 @@ impl &Condvar { */ fn wait_on(condvar_id: uint) { // Create waiter nobe. - let (WaitEnd, SignalEnd) = pipes::oneshot(); + let (WaitEnd, SignalEnd) = comm::oneshot(); let mut WaitEnd = Some(WaitEnd); let mut SignalEnd = Some(SignalEnd); let mut reacquire = None; @@ -250,7 +250,7 @@ impl &Condvar { // Unconditionally "block". (Might not actually block if a // signaller already sent -- I mean 'unconditionally' in contrast // with acquire().) - let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd)); + let _ = comm::recv_one(option::swap_unwrap(&mut WaitEnd)); } // This is needed for a failing condition variable to reacquire the @@ -749,7 +749,7 @@ mod tests { #[test] pub fn test_sem_as_cvar() { /* Child waits and parent signals */ - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let s = ~semaphore(0); let s2 = ~s.clone(); do task::spawn || { @@ -761,7 +761,7 @@ mod tests { let _ = p.recv(); /* Parent waits and child signals */ - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let s = ~semaphore(0); let s2 = ~s.clone(); do task::spawn || { @@ -778,8 +778,8 @@ mod tests { // time, and shake hands. let s = ~semaphore(2); let s2 = ~s.clone(); - let (p1,c1) = pipes::stream(); - let (p2,c2) = pipes::stream(); + let (p1,c1) = comm::stream(); + let (p2,c2) = comm::stream(); do task::spawn || { do s2.access { let _ = p2.recv(); @@ -798,7 +798,7 @@ mod tests { do task::spawn_sched(task::ManualThreads(1)) { let s = ~semaphore(1); let s2 = ~s.clone(); - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let child_data = ~mut Some((s2, c)); do s.access { let (s2,c) = option::swap_unwrap(child_data); @@ -820,7 +820,7 @@ mod tests { pub fn test_mutex_lock() { // Unsafely achieve shared state, and do the textbook // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance. - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let m = ~Mutex(); let m2 = ~m.clone(); let mut sharedstate = ~0; @@ -863,7 +863,7 @@ mod tests { cond.wait(); } // Parent wakes up child - let (port,chan) = pipes::stream(); + let (port,chan) = comm::stream(); let m3 = ~m.clone(); do task::spawn || { do m3.lock_cond |cond| { @@ -886,7 +886,7 @@ mod tests { for num_waiters.times { let mi = ~m.clone(); - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); ports.push(port); do task::spawn || { do mi.lock_cond |cond| { @@ -948,7 +948,7 @@ mod tests { let m2 = ~m.clone(); let result: result::Result<(),()> = do task::try || { - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); do task::spawn || { // linked let _ = p.recv(); // wait for sibling to get in the mutex task::yield(); @@ -970,12 +970,12 @@ mod tests { pub fn test_mutex_killed_broadcast() { let m = ~Mutex(); let m2 = ~m.clone(); - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let result: result::Result<(),()> = do task::try || { let mut sibling_convos = ~[]; for 2.times { - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let c = ~mut Some(c); sibling_convos.push(p); let mi = ~m2.clone(); @@ -1004,7 +1004,7 @@ mod tests { assert woken == 0; } struct SendOnFailure { - c: pipes::Chan<()>, + c: comm::Chan<()>, } impl Drop for SendOnFailure { @@ -1013,7 +1013,7 @@ mod tests { } } - fn SendOnFailure(c: pipes::Chan<()>) -> SendOnFailure { + fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure { SendOnFailure { c: c } @@ -1038,7 +1038,7 @@ mod tests { let result = do task::try { let m = ~mutex_with_condvars(2); let m2 = ~m.clone(); - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); do task::spawn || { do m2.lock_cond |cond| { c.send(()); @@ -1099,7 +1099,7 @@ mod tests { mode2: RWlockMode) { // Test mutual exclusion between readers and writers. Just like the // mutex mutual exclusion test, a ways above. - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let x2 = ~x.clone(); let mut sharedstate = ~0; let ptr = ptr::addr_of(&(*sharedstate)); @@ -1146,8 +1146,8 @@ mod tests { make_mode2_go_first: bool) { // Much like sem_multi_resource. let x2 = ~x.clone(); - let (p1,c1) = pipes::stream(); - let (p2,c2) = pipes::stream(); + let (p1,c1) = comm::stream(); + let (p2,c2) = comm::stream(); do task::spawn || { if !make_mode2_go_first { let _ = p2.recv(); // parent sends to us once it locks, or ... @@ -1212,7 +1212,7 @@ mod tests { cond.wait(); } // Parent wakes up child - let (port,chan) = pipes::stream(); + let (port,chan) = comm::stream(); let x3 = ~x.clone(); do task::spawn || { do x3.write_cond |cond| { @@ -1249,7 +1249,7 @@ mod tests { for num_waiters.times { let xi = ~x.clone(); - let (port, chan) = pipes::stream(); + let (port, chan) = comm::stream(); ports.push(port); do task::spawn || { do lock_cond(xi, dg1) |cond| { diff --git a/src/libstd/task_pool.rs b/src/libstd/task_pool.rs index 6f479fbb9f7..6b8ea8a6ef4 100644 --- a/src/libstd/task_pool.rs +++ b/src/libstd/task_pool.rs @@ -12,7 +12,7 @@ /// parallelism. use core::io; -use core::pipes::{Chan, Port}; +use core::comm::{Chan, Port}; use core::pipes; use core::prelude::*; use core::task::{SchedMode, SingleThreaded}; @@ -47,7 +47,7 @@ pub impl<T> TaskPool<T> { assert n_tasks >= 1; let channels = do vec::from_fn(n_tasks) |i| { - let (port, chan) = pipes::stream::<Msg<T>>(); + let (port, chan) = comm::stream::<Msg<T>>(); let init_fn = init_fn_factory(); let task_body: ~fn() = || { diff --git a/src/libstd/test.rs b/src/libstd/test.rs index cd03de91183..e14e9665216 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -27,7 +27,7 @@ use core::either; use core::io::WriterUtil; use core::io; use core::libc::size_t; -use core::pipes::{stream, Chan, Port, SharedChan}; +use core::comm::{stream, Chan, Port, SharedChan}; use core::option; use core::prelude::*; use core::result; @@ -794,7 +794,7 @@ mod tests { use test::{TestOpts, run_test}; use core::either; - use core::pipes::{stream, SharedChan}; + use core::comm::{stream, SharedChan}; use core::option; use core::vec; diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 6768ff23248..b711825aecf 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -18,7 +18,7 @@ use core::either; use core::libc; use core::libc::c_void; use core::cast::transmute; -use core::pipes::{stream, Chan, SharedChan, Port, select2i}; +use core::comm::{stream, Chan, SharedChan, Port, select2i}; use core::prelude::*; use core::ptr; use core; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 872d53e93eb..401cecf8811 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -17,7 +17,7 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::comm::{Port, Chan, SharedChan, select2i}; use core::private::global::{global_data_clone_create, global_data_clone}; use core::private::weak_task::weaken_task; @@ -133,7 +133,7 @@ mod test { use core::task; use core::cast::transmute; use core::libc::c_void; - use core::pipes::{stream, SharedChan, Chan}; + use core::comm::{stream, SharedChan, Chan}; extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) { unsafe { diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index be4240237ad..52956f152fe 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -19,7 +19,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::pipes::{stream, Port, Chan, SharedChan}; +use core::comm::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index 96b285b8c0a..dd54620c83d 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -39,7 +39,7 @@ use core::ptr::to_unsafe_ptr; use core::ptr; use core::str; use core::vec; -use core::pipes::{stream, Chan, SharedChan, Port}; +use core::comm::{stream, Chan, SharedChan, Port}; // libuv struct mappings pub struct uv_ip4_addr { diff --git a/src/libstd/workcache.rs b/src/libstd/workcache.rs index a06dee723c8..8ce68a41f81 100644 --- a/src/libstd/workcache.rs +++ b/src/libstd/workcache.rs @@ -19,7 +19,8 @@ use core::cmp; use core::either::{Either, Left, Right}; use core::io; use core::option; -use core::pipes::{recv, oneshot, PortOne, send_one}; +use core::comm::{oneshot, PortOne, send_one}; +use core::pipes::recv; use core::prelude::*; use core::result; use core::run; diff --git a/src/libsyntax/ext/pipes/pipec.rs b/src/libsyntax/ext/pipes/pipec.rs index 48bd8b03297..e8e4c939907 100644 --- a/src/libsyntax/ext/pipes/pipec.rs +++ b/src/libsyntax/ext/pipes/pipec.rs @@ -78,10 +78,10 @@ pub impl gen_send for message { }; body += ~"let b = pipe.reuse_buffer();\n"; - body += fmt!("let %s = ::pipes::SendPacketBuffered(\ + body += fmt!("let %s = ::core::pipes::SendPacketBuffered(\ ::ptr::addr_of(&(b.buffer.data.%s)));\n", sp, next.name); - body += fmt!("let %s = ::pipes::RecvPacketBuffered(\ + body += fmt!("let %s = ::core::pipes::RecvPacketBuffered(\ ::ptr::addr_of(&(b.buffer.data.%s)));\n", rp, next.name); } @@ -93,7 +93,7 @@ pub impl gen_send for message { (recv, recv) => "(c, s)" }; - body += fmt!("let %s = ::pipes::entangle();\n", pat); + body += fmt!("let %s = ::core::pipes::entangle();\n", pat); } body += fmt!("let message = %s(%s);\n", self.name(), @@ -102,14 +102,14 @@ pub impl gen_send for message { ~"s"), ~", ")); if !try { - body += fmt!("::pipes::send(pipe, message);\n"); + body += fmt!("::core::pipes::send(pipe, message);\n"); // return the new channel body += ~"c }"; } else { - body += fmt!("if ::pipes::send(pipe, message) {\n \ - ::pipes::rt::make_some(c) \ - } else { ::pipes::rt::make_none() } }"); + body += fmt!("if ::core::pipes::send(pipe, message) {\n \ + ::core::pipes::rt::make_some(c) \ + } else { ::core::pipes::rt::make_none() } }"); } let body = cx.parse_expr(body); @@ -162,14 +162,14 @@ pub impl gen_send for message { message_args); if !try { - body += fmt!("::pipes::send(pipe, message);\n"); + body += fmt!("::core::pipes::send(pipe, message);\n"); body += ~" }"; } else { - body += fmt!("if ::pipes::send(pipe, message) \ + body += fmt!("if ::core::pipes::send(pipe, message) \ { \ - ::pipes::rt::make_some(()) \ + ::core::pipes::rt::make_some(()) \ } else { \ - ::pipes::rt::make_none() \ + ::core::pipes::rt::make_none() \ } }"); } @@ -272,7 +272,8 @@ pub impl to_type_decls for state { self.data_name(), self.span, cx.ty_path_ast_builder( - path_global(~[cx.ident_of(~"pipes"), + path_global(~[cx.ident_of(~"core"), + cx.ident_of(~"pipes"), cx.ident_of(dir.to_str() + ~"Packet")], dummy_sp()) .add_ty(cx.ty_path_ast_builder( @@ -288,7 +289,8 @@ pub impl to_type_decls for state { self.data_name(), self.span, cx.ty_path_ast_builder( - path_global(~[cx.ident_of(~"pipes"), + path_global(~[cx.ident_of(~"core"), + cx.ident_of(~"pipes"), cx.ident_of(dir.to_str() + ~"PacketBuffered")], dummy_sp()) @@ -313,10 +315,10 @@ pub impl gen_init for protocol { let body = if !self.is_bounded() { match start_state.dir { - send => quote_expr!( ::pipes::entangle() ), + send => quote_expr!( ::core::pipes::entangle() ), recv => { quote_expr!({ - let (s, c) = ::pipes::entangle(); + let (s, c) = ::core::pipes::entangle(); (c, s) }) } @@ -336,7 +338,7 @@ pub impl gen_init for protocol { }; cx.parse_item(fmt!("pub fn init%s() -> (client::%s, server::%s)\ - { use pipes::HasBuffer; %s }", + { use core::pipes::HasBuffer; %s }", start_state.ty_params.to_source(cx), start_state.to_ty(cx).to_source(cx), start_state.to_ty(cx).to_source(cx), @@ -350,7 +352,7 @@ pub impl gen_init for protocol { let fty = s.to_ty(ext_cx); ext_cx.field_imm(ext_cx.ident_of(s.name), quote_expr!( - ::pipes::mk_packet::<$fty>() + ::core::pipes::mk_packet::<$fty>() )) })) } @@ -358,8 +360,8 @@ pub impl gen_init for protocol { fn gen_init_bounded(&self, ext_cx: ext_ctxt) -> @ast::expr { debug!("gen_init_bounded"); let buffer_fields = self.gen_buffer_init(ext_cx); - let buffer = quote_expr!(~::pipes::Buffer { - header: ::pipes::BufferHeader(), + let buffer = quote_expr!(~::core::pipes::Buffer { + header: ::core::pipes::BufferHeader(), data: $buffer_fields, }); @@ -375,7 +377,7 @@ pub impl gen_init for protocol { quote_expr!({ let buffer = $buffer; - do ::pipes::entangle_buffer(buffer) |buffer, data| { + do ::core::pipes::entangle_buffer(buffer) |buffer, data| { $entangle_body } }) @@ -408,7 +410,7 @@ pub impl gen_init for protocol { } } let ty = s.to_ty(cx); - let fty = quote_ty!( ::pipes::Packet<$ty> ); + let fty = quote_ty!( ::core::pipes::Packet<$ty> ); @spanned { node: ast::struct_field_ { diff --git a/src/test/auxiliary/cci_capture_clause.rs b/src/test/auxiliary/cci_capture_clause.rs index 8038c5fc41a..f2749ed1d0c 100644 --- a/src/test/auxiliary/cci_capture_clause.rs +++ b/src/test/auxiliary/cci_capture_clause.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; pub fn foo<T:Owned + Copy>(x: T) -> Port<T> { let (p, c) = stream(); diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index bfbc7ecd20a..4bbd22786a5 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -24,7 +24,7 @@ extern mod std; use io::Writer; use io::WriterUtil; -use pipes::{Port, Chan, SharedChan}; +use comm::{Port, Chan, SharedChan}; macro_rules! move_out ( { $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } } @@ -36,7 +36,7 @@ enum request { stop } -fn server(requests: Port<request>, responses: pipes::Chan<uint>) { +fn server(requests: Port<request>, responses: comm::Chan<uint>) { let mut count = 0u; let mut done = false; while !done { @@ -55,8 +55,8 @@ fn server(requests: Port<request>, responses: pipes::Chan<uint>) { } fn run(args: &[~str]) { - let (from_child, to_parent) = pipes::stream(); - let (from_parent, to_child) = pipes::stream(); + let (from_child, to_parent) = comm::stream(); + let (from_parent, to_child) = comm::stream(); let to_child = SharedChan(to_child); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 57d9bb49df2..a969368ebac 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -20,7 +20,7 @@ extern mod std; use io::Writer; use io::WriterUtil; -use pipes::{Port, PortSet, Chan}; +use comm::{Port, PortSet, Chan, stream}; macro_rules! move_out ( { $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } } @@ -32,7 +32,7 @@ enum request { stop } -fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) { +fn server(requests: PortSet<request>, responses: Chan<uint>) { let mut count = 0; let mut done = false; while !done { @@ -51,8 +51,8 @@ fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) { } fn run(args: &[~str]) { - let (from_child, to_parent) = pipes::stream(); - let (from_parent_, to_child) = pipes::stream(); + let (from_child, to_parent) = stream(); + let (from_parent_, to_child) = stream(); let from_parent = PortSet(); from_parent.add(from_parent_); @@ -62,7 +62,7 @@ fn run(args: &[~str]) { let start = std::time::precise_time_s(); let mut worker_results = ~[]; for uint::range(0, workers) |_i| { - let (from_parent_, to_child) = pipes::stream(); + let (from_parent_, to_child) = stream(); from_parent.add(from_parent_); do task::task().future_result(|+r| { worker_results.push(r); diff --git a/src/test/bench/msgsend-ring-pipes.rs b/src/test/bench/msgsend-ring-pipes.rs index 71ca0f95765..0f7c41f5997 100644 --- a/src/test/bench/msgsend-ring-pipes.rs +++ b/src/test/bench/msgsend-ring-pipes.rs @@ -20,7 +20,7 @@ extern mod std; use std::time; use std::future; -use pipes::recv; +use core::pipes::recv; proto! ring ( num:send { diff --git a/src/test/bench/pingpong.rs b/src/test/bench/pingpong.rs index 16f44b88793..a444dfd26b7 100644 --- a/src/test/bench/pingpong.rs +++ b/src/test/bench/pingpong.rs @@ -16,7 +16,7 @@ extern mod std; -use pipes::{spawn_service, recv}; +use core::pipes::{spawn_service, recv}; use std::time::precise_time_s; proto! pingpong ( @@ -72,9 +72,9 @@ macro_rules! follow ( ) ) -fn switch<T:Owned,Tb:Owned,U>(+endp: pipes::RecvPacketBuffered<T, Tb>, +fn switch<T:Owned,Tb:Owned,U>(+endp: core::pipes::RecvPacketBuffered<T, Tb>, f: fn(+v: Option<T>) -> U) -> U { - f(pipes::try_recv(endp)) + f(core::pipes::try_recv(endp)) } // Here's the benchmark diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs index b42ec246ccb..42a1e4b5046 100644 --- a/src/test/bench/shootout-chameneos-redux.rs +++ b/src/test/bench/shootout-chameneos-redux.rs @@ -15,7 +15,7 @@ use std::oldmap; use std::oldmap::HashMap; use std::sort; use std::cell::Cell; -use core::pipes::*; +use core::comm::*; fn print_complements() { let all = ~[Blue, Red, Yellow]; diff --git a/src/test/bench/shootout-k-nucleotide-pipes.rs b/src/test/bench/shootout-k-nucleotide-pipes.rs index a887a13bf38..3fe5f705705 100644 --- a/src/test/bench/shootout-k-nucleotide-pipes.rs +++ b/src/test/bench/shootout-k-nucleotide-pipes.rs @@ -18,7 +18,7 @@ use std::oldmap; use std::oldmap::HashMap; use std::sort; use io::ReaderUtil; -use pipes::{stream, Port, Chan}; +use comm::{stream, Port, Chan}; use cmp::Ord; // given a map, print a sorted version of it @@ -97,8 +97,8 @@ fn windows_with_carry(bb: &[u8], nn: uint, return vec::slice(bb, len - (nn - 1u), len).to_vec(); } -fn make_sequence_processor(sz: uint, from_parent: pipes::Port<~[u8]>, - to_parent: pipes::Chan<~str>) { +fn make_sequence_processor(sz: uint, from_parent: comm::Port<~[u8]>, + to_parent: comm::Chan<~str>) { let freqs: HashMap<~[u8], uint> = oldmap::HashMap(); let mut carry: ~[u8] = ~[]; @@ -159,7 +159,7 @@ fn main() { from_child.push(from_child_); - let (from_parent, to_child) = pipes::stream(); + let (from_parent, to_child) = comm::stream(); do task::spawn_with(from_parent) |from_parent| { make_sequence_processor(sz, from_parent, to_parent_); diff --git a/src/test/bench/shootout-mandelbrot.rs b/src/test/bench/shootout-mandelbrot.rs index 840cec44c64..5e472712fda 100644 --- a/src/test/bench/shootout-mandelbrot.rs +++ b/src/test/bench/shootout-mandelbrot.rs @@ -108,7 +108,7 @@ impl io::Writer for Devnull { fn get_type(&self) -> io::WriterType { io::File } } -fn writer(path: ~str, pport: pipes::Port<Line>, size: uint) +fn writer(path: ~str, pport: comm::Port<Line>, size: uint) { let cout: io::Writer = match path { ~"" => { @@ -172,8 +172,8 @@ fn main() { let size = if vec::len(args) < 2_u { 80_u } else { uint::from_str(args[1]).get() }; - let (pport, pchan) = pipes::stream(); - let pchan = pipes::SharedChan(pchan); + let (pport, pchan) = comm::stream(); + let pchan = comm::SharedChan(pchan); for uint::range(0_u, size) |j| { let cchan = pchan.clone(); do task::spawn { cchan.send(chanmb(j, size, depth)) }; diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 2c9da65cc13..a8383c4647e 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -24,12 +24,9 @@ extern mod std; use std::{time, getopts}; -use io::WriterUtil; -use int::range; -use pipes::Port; -use pipes::Chan; -use pipes::send; -use pipes::recv; +use core::int::range; +use core::comm::*; +use core::io::WriterUtil; use core::result; use result::{Ok, Err}; @@ -41,7 +38,7 @@ fn fib(n: int) -> int { } else if n <= 2 { c.send(1); } else { - let p = pipes::PortSet(); + let p = PortSet(); let ch = p.chan(); task::spawn(|| pfib(ch, n - 1) ); let ch = p.chan(); @@ -50,7 +47,7 @@ fn fib(n: int) -> int { } } - let (p, ch) = pipes::stream(); + let (p, ch) = stream(); let _t = task::spawn(|| pfib(ch, n) ); p.recv() } diff --git a/src/test/bench/task-perf-jargon-metal-smoke.rs b/src/test/bench/task-perf-jargon-metal-smoke.rs index f2441755a7b..528dfd3ec73 100644 --- a/src/test/bench/task-perf-jargon-metal-smoke.rs +++ b/src/test/bench/task-perf-jargon-metal-smoke.rs @@ -15,7 +15,7 @@ // // The filename is a song reference; google it in quotes. -fn child_generation(gens_left: uint, -c: pipes::Chan<()>) { +fn child_generation(gens_left: uint, -c: comm::Chan<()>) { // This used to be O(n^2) in the number of generations that ever existed. // With this code, only as many generations are alive at a time as tasks // alive at a time, @@ -43,7 +43,7 @@ fn main() { copy args }; - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); child_generation(uint::from_str(args[1]).get(), c); if p.try_recv().is_none() { fail!(~"it happened when we slumbered"); diff --git a/src/test/bench/task-perf-linked-failure.rs b/src/test/bench/task-perf-linked-failure.rs index 3b6ececaef9..8bb4c9bc592 100644 --- a/src/test/bench/task-perf-linked-failure.rs +++ b/src/test/bench/task-perf-linked-failure.rs @@ -20,7 +20,7 @@ // Creates in the background 'num_tasks' tasks, all blocked forever. // Doesn't return until all such tasks are ready, but doesn't block forever itself. -use core::pipes::*; +use core::comm::*; fn grandchild_group(num_tasks: uint) { let (po, ch) = stream(); diff --git a/src/test/bench/task-perf-one-million.rs b/src/test/bench/task-perf-one-million.rs index c5092ecaecc..8e1cbb9e17b 100644 --- a/src/test/bench/task-perf-one-million.rs +++ b/src/test/bench/task-perf-one-million.rs @@ -10,7 +10,7 @@ // Test for concurrent tasks -use core::pipes::*; +use core::comm::*; fn calc(children: uint, parent_wait_chan: &Chan<Chan<Chan<int>>>) { diff --git a/src/test/compile-fail/bind-by-move-no-guards.rs b/src/test/compile-fail/bind-by-move-no-guards.rs index 40a444df12d..d428feb2a24 100644 --- a/src/test/compile-fail/bind-by-move-no-guards.rs +++ b/src/test/compile-fail/bind-by-move-no-guards.rs @@ -9,7 +9,7 @@ // except according to those terms. fn main() { - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); let x = Some(p); c.send(false); match x { diff --git a/src/test/compile-fail/unsendable-class.rs b/src/test/compile-fail/unsendable-class.rs index 8e9ce5f97fc..3eebc4647c2 100644 --- a/src/test/compile-fail/unsendable-class.rs +++ b/src/test/compile-fail/unsendable-class.rs @@ -25,6 +25,6 @@ fn foo(i:int, j: @~str) -> foo { fn main() { let cat = ~"kitty"; - let (_, ch) = pipes::stream(); //~ ERROR does not fulfill `Owned` + let (_, ch) = comm::stream(); //~ ERROR does not fulfill `Owned` ch.send(foo(42, @(cat))); //~ ERROR does not fulfill `Owned` } diff --git a/src/test/run-fail/linked-failure.rs b/src/test/run-fail/linked-failure.rs index d592fb80f76..e8bb075ac00 100644 --- a/src/test/run-fail/linked-failure.rs +++ b/src/test/run-fail/linked-failure.rs @@ -16,7 +16,7 @@ extern mod std; fn child() { assert (1 == 2); } fn main() { - let (p, _c) = pipes::stream::<int>(); + let (p, _c) = comm::stream::<int>(); task::spawn(|| child() ); let x = p.recv(); } diff --git a/src/test/run-fail/linked-failure2.rs b/src/test/run-fail/linked-failure2.rs index 1402020c357..9f09c16ed6a 100644 --- a/src/test/run-fail/linked-failure2.rs +++ b/src/test/run-fail/linked-failure2.rs @@ -15,7 +15,7 @@ fn child() { fail!(); } fn main() { - let (p, _c) = pipes::stream::<()>(); + let (p, _c) = comm::stream::<()>(); task::spawn(|| child() ); task::yield(); } diff --git a/src/test/run-fail/linked-failure3.rs b/src/test/run-fail/linked-failure3.rs index cb03a71aabc..c2c97662b6c 100644 --- a/src/test/run-fail/linked-failure3.rs +++ b/src/test/run-fail/linked-failure3.rs @@ -15,13 +15,13 @@ fn grandchild() { fail!(~"grandchild dies"); } fn child() { - let (p, _c) = pipes::stream::<int>(); + let (p, _c) = comm::stream::<int>(); task::spawn(|| grandchild() ); let x = p.recv(); } fn main() { - let (p, _c) = pipes::stream::<int>(); + let (p, _c) = comm::stream::<int>(); task::spawn(|| child() ); let x = p.recv(); } diff --git a/src/test/run-fail/linked-failure4.rs b/src/test/run-fail/linked-failure4.rs index 18d6b3c369b..97e4edc81bc 100644 --- a/src/test/run-fail/linked-failure4.rs +++ b/src/test/run-fail/linked-failure4.rs @@ -14,7 +14,7 @@ fn child() { assert (1 == 2); } fn parent() { - let (p, _c) = pipes::stream::<int>(); + let (p, _c) = comm::stream::<int>(); task::spawn(|| child() ); let x = p.recv(); } @@ -22,7 +22,7 @@ fn parent() { // This task is not linked to the failure chain, but since the other // tasks are going to fail the kernel, this one will fail too fn sleeper() { - let (p, _c) = pipes::stream::<int>(); + let (p, _c) = comm::stream::<int>(); let x = p.recv(); } diff --git a/src/test/run-fail/task-comm-recv-block.rs b/src/test/run-fail/task-comm-recv-block.rs index bd866b9f9e7..a0896ea7bab 100644 --- a/src/test/run-fail/task-comm-recv-block.rs +++ b/src/test/run-fail/task-comm-recv-block.rs @@ -17,7 +17,7 @@ fn goodfail() { fn main() { task::spawn(|| goodfail() ); - let (po, _c) = pipes::stream(); + let (po, _c) = comm::stream(); // We shouldn't be able to get past this recv since there's no // message available let i: int = po.recv(); diff --git a/src/test/run-pass/capture_nil.rs b/src/test/run-pass/capture_nil.rs index b77e91c8d21..99d8fab4bba 100644 --- a/src/test/run-pass/capture_nil.rs +++ b/src/test/run-pass/capture_nil.rs @@ -24,7 +24,7 @@ // course preferable, as the value itself is // irrelevant). -use core::pipes::*; +use core::comm::*; fn foo(&&x: ()) -> Port<()> { let (p, c) = stream::<()>(); diff --git a/src/test/run-pass/comm.rs b/src/test/run-pass/comm.rs index 1af0bb003f2..da467ae7ba5 100644 --- a/src/test/run-pass/comm.rs +++ b/src/test/run-pass/comm.rs @@ -9,7 +9,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; pub fn main() { let (p, ch) = stream(); diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index 6a12704d4b4..b90633bab01 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -20,14 +20,14 @@ extern mod std; use std::oldmap; use std::oldmap::HashMap; -use core::pipes::*; +use core::comm::*; pub fn map(filename: ~str, emit: map_reduce::putter) { emit(filename, ~"1"); } mod map_reduce { use std::oldmap; use std::oldmap::HashMap; - use core::pipes::*; + use core::comm::*; pub type putter = fn@(~str, ~str); diff --git a/src/test/run-pass/issue-3168.rs b/src/test/run-pass/issue-3168.rs index d40bf6e32cf..e0ec62ff255 100644 --- a/src/test/run-pass/issue-3168.rs +++ b/src/test/run-pass/issue-3168.rs @@ -11,15 +11,15 @@ // xfail-fast pub fn main() { - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); do task::try || { - let (p2,c2) = pipes::stream(); + let (p2,c2) = comm::stream(); do task::spawn || { p2.recv(); error!("sibling fails"); fail!(); } - let (p3,c3) = pipes::stream(); + let (p3,c3) = comm::stream(); c.send(c3); c2.send(()); error!("child blocks"); diff --git a/src/test/run-pass/issue-3176.rs b/src/test/run-pass/issue-3176.rs index fac73d07b66..e441fa22b31 100644 --- a/src/test/run-pass/issue-3176.rs +++ b/src/test/run-pass/issue-3176.rs @@ -10,22 +10,22 @@ // xfail-fast -use pipes::{Select2, Selectable}; +use comm::{Select2, Selectable}; pub fn main() { - let (p,c) = pipes::stream(); + let (p,c) = comm::stream(); do task::try || { - let (p2,c2) = pipes::stream(); + let (p2,c2) = comm::stream(); do task::spawn || { p2.recv(); error!("sibling fails"); fail!(); } - let (p3,c3) = pipes::stream(); + let (p3,c3) = comm::stream(); c.send(c3); c2.send(()); error!("child blocks"); - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); (p, p3).select(); c.send(()); }; diff --git a/src/test/run-pass/issue-3609.rs b/src/test/run-pass/issue-3609.rs index 6eb540c4737..83703dacc36 100644 --- a/src/test/run-pass/issue-3609.rs +++ b/src/test/run-pass/issue-3609.rs @@ -1,6 +1,6 @@ extern mod std; -use pipes::Chan; +use comm::Chan; type RingBuffer = ~[float]; type SamplesFn = fn~ (samples: &RingBuffer); diff --git a/src/test/run-pass/ivec-tag.rs b/src/test/run-pass/ivec-tag.rs index 9971b098bb0..017d90cbcd7 100644 --- a/src/test/run-pass/ivec-tag.rs +++ b/src/test/run-pass/ivec-tag.rs @@ -1,4 +1,4 @@ -use core::pipes::*; +use core::comm::*; fn producer(c: &Chan<~[u8]>) { c.send( diff --git a/src/test/run-pass/pipe-bank-proto.rs b/src/test/run-pass/pipe-bank-proto.rs index 0016d792c0d..b74c70d3ea7 100644 --- a/src/test/run-pass/pipe-bank-proto.rs +++ b/src/test/run-pass/pipe-bank-proto.rs @@ -15,7 +15,8 @@ // // http://theincredibleholk.wordpress.com/2012/07/06/rusty-pipes/ -use pipes::try_recv; +use core::pipes; +use core::pipes::try_recv; pub type username = ~str; pub type password = ~str; diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index 2d765423988..c09fbd19fdc 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -20,7 +20,8 @@ extern mod std; use std::timer::sleep; use std::uv; -use pipes::{try_recv, recv}; +use core::pipes; +use core::pipes::{try_recv, recv}; proto! oneshot ( waiting:send { diff --git a/src/test/run-pass/pipe-peek.rs b/src/test/run-pass/pipe-peek.rs index baa5ba5bf00..dc03eab22a8 100644 --- a/src/test/run-pass/pipe-peek.rs +++ b/src/test/run-pass/pipe-peek.rs @@ -15,6 +15,7 @@ extern mod std; use std::timer::sleep; use std::uv; +use core::pipes; proto! oneshot ( waiting:send { diff --git a/src/test/run-pass/pipe-pingpong-bounded.rs b/src/test/run-pass/pipe-pingpong-bounded.rs index b13b262e864..4b2ac40dc4a 100644 --- a/src/test/run-pass/pipe-pingpong-bounded.rs +++ b/src/test/run-pass/pipe-pingpong-bounded.rs @@ -20,6 +20,7 @@ #[legacy_records]; mod pingpong { + use core::pipes; use core::pipes::*; use core::ptr; @@ -45,6 +46,7 @@ mod pingpong { pub enum ping = server::pong; pub enum pong = client::ping; pub mod client { + use core::pipes; use core::pipes::*; use core::ptr; @@ -54,7 +56,7 @@ mod pingpong { let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.pong))); let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.pong))); let message = ::pingpong::ping(s); - ::pipes::send(pipe, message); + send(pipe, message); c } } @@ -64,6 +66,7 @@ mod pingpong { ::pingpong::packets>; } pub mod server { + use core::pipes; use core::pipes::*; use core::ptr; @@ -75,7 +78,7 @@ mod pingpong { let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.ping))); let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.ping))); let message = ::pingpong::pong(s); - ::pipes::send(pipe, message); + send(pipe, message); c } } @@ -85,7 +88,7 @@ mod pingpong { } mod test { - use pipes::recv; + use core::pipes::recv; use pingpong::{ping, pong}; pub fn client(-chan: ::pingpong::client::ping) { diff --git a/src/test/run-pass/pipe-presentation-examples.rs b/src/test/run-pass/pipe-presentation-examples.rs index 7ac6337d1ca..423e4782333 100644 --- a/src/test/run-pass/pipe-presentation-examples.rs +++ b/src/test/run-pass/pipe-presentation-examples.rs @@ -34,7 +34,7 @@ macro_rules! select_if ( ], )* } => { if $index == $count { - match pipes::try_recv($port) { + match core::pipes::try_recv($port) { $(Some($message($($($x,)+)* next)) => { let $next = next; $e @@ -68,7 +68,7 @@ macro_rules! select ( -> $next:ident $e:expr),+ } )+ } => ({ - let index = pipes::selecti([$(($port).header()),+]); + let index = core::comm::selecti([$(($port).header()),+]); select_if!(index, 0, $( $port => [ $($message$(($($x),+))dont_type_this* -> $next $e),+ ], )+) diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index 0bf739139cf..0e170cbe995 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -19,7 +19,8 @@ extern mod std; use std::timer::sleep; use std::uv; -use pipes::{recv, select}; +use core::pipes; +use core::pipes::{recv, select}; proto! oneshot ( waiting:send { diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 521c400489e..51980eeea94 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -15,7 +15,8 @@ extern mod std; use std::timer::sleep; use std::uv; -use pipes::recv; +use core::pipes; +use core::pipes::recv; proto! oneshot ( waiting:send { diff --git a/src/test/run-pass/rt-sched-1.rs b/src/test/run-pass/rt-sched-1.rs index cefed420546..ca37a6663fd 100644 --- a/src/test/run-pass/rt-sched-1.rs +++ b/src/test/run-pass/rt-sched-1.rs @@ -10,7 +10,7 @@ // Tests of the runtime's scheduler interface -use core::pipes::*; +use core::comm::*; type sched_id = int; type task_id = *libc::c_void; diff --git a/src/test/run-pass/send-iloop.rs b/src/test/run-pass/send-iloop.rs index f1b9c85a0ff..18f4fd27858 100644 --- a/src/test/run-pass/send-iloop.rs +++ b/src/test/run-pass/send-iloop.rs @@ -17,7 +17,7 @@ fn die() { fn iloop() { task::spawn(|| die() ); - let (p, c) = core::pipes::stream::<()>(); + let (p, c) = comm::stream::<()>(); loop { // Sending and receiving here because these actions yield, // at which point our child can kill us. diff --git a/src/test/run-pass/send-resource.rs b/src/test/run-pass/send-resource.rs index ac910232c16..6bda62be621 100644 --- a/src/test/run-pass/send-resource.rs +++ b/src/test/run-pass/send-resource.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; struct test { f: int, diff --git a/src/test/run-pass/send-type-inference.rs b/src/test/run-pass/send-type-inference.rs index 77f06140369..0f924df8dc0 100644 --- a/src/test/run-pass/send-type-inference.rs +++ b/src/test/run-pass/send-type-inference.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; // tests that ctrl's type gets inferred properly type command<K, V> = {key: K, val: V}; diff --git a/src/test/run-pass/sendable-class.rs b/src/test/run-pass/sendable-class.rs index 19169c168c2..8ef0173dbd3 100644 --- a/src/test/run-pass/sendable-class.rs +++ b/src/test/run-pass/sendable-class.rs @@ -23,6 +23,6 @@ fn foo(i:int, j: char) -> foo { } pub fn main() { - let (_po, ch) = pipes::stream(); + let (_po, ch) = comm::stream(); ch.send(foo(42, 'c')); } diff --git a/src/test/run-pass/spawn-types.rs b/src/test/run-pass/spawn-types.rs index 6090f2eb71c..4111b505490 100644 --- a/src/test/run-pass/spawn-types.rs +++ b/src/test/run-pass/spawn-types.rs @@ -14,7 +14,7 @@ Arnold. */ -use core::pipes::*; +use core::comm::*; type ctx = Chan<int>; diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs index aa8a2a9146c..f260e571b42 100644 --- a/src/test/run-pass/task-comm-0.rs +++ b/src/test/run-pass/task-comm-0.rs @@ -13,8 +13,8 @@ extern mod std; -use pipes::Chan; -use pipes::Port; +use comm::Chan; +use comm::Port; pub fn main() { test05(); } @@ -28,7 +28,7 @@ fn test05_start(ch : Chan<int>) { } fn test05() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); task::spawn(|| test05_start(ch) ); let mut value = po.recv(); log(error, value); diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs index 289a728efc3..379b9acf6f5 100644 --- a/src/test/run-pass/task-comm-10.rs +++ b/src/test/run-pass/task-comm-10.rs @@ -13,8 +13,8 @@ extern mod std; -fn start(c: pipes::Chan<pipes::Chan<~str>>) { - let (p, ch) = pipes::stream(); +fn start(c: comm::Chan<comm::Chan<~str>>) { + let (p, ch) = comm::stream(); c.send(ch); let mut a; @@ -28,7 +28,7 @@ fn start(c: pipes::Chan<pipes::Chan<~str>>) { } pub fn main() { - let (p, ch) = pipes::stream(); + let (p, ch) = comm::stream(); let child = task::spawn(|| start(ch) ); let c = p.recv(); diff --git a/src/test/run-pass/task-comm-11.rs b/src/test/run-pass/task-comm-11.rs index 996566abcd8..3e3eefd26ba 100644 --- a/src/test/run-pass/task-comm-11.rs +++ b/src/test/run-pass/task-comm-11.rs @@ -13,13 +13,13 @@ extern mod std; -fn start(c: pipes::Chan<pipes::Chan<int>>) { - let (p, ch) = pipes::stream(); +fn start(c: comm::Chan<comm::Chan<int>>) { + let (p, ch) = comm::stream(); c.send(ch); } pub fn main() { - let (p, ch) = pipes::stream(); + let (p, ch) = comm::stream(); let child = task::spawn(|| start(ch) ); let c = p.recv(); } diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs index 4ee23ec54d6..a246f1f4af2 100644 --- a/src/test/run-pass/task-comm-13.rs +++ b/src/test/run-pass/task-comm-13.rs @@ -12,16 +12,15 @@ #[legacy_modes]; extern mod std; -use pipes::send; -fn start(c: pipes::Chan<int>, start: int, number_of_messages: int) { +fn start(c: comm::Chan<int>, start: int, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(start + i); i += 1; } } pub fn main() { debug!("Check that we don't deadlock."); - let (p, ch) = pipes::stream(); + let (p, ch) = comm::stream(); task::try(|| start(ch, 0, 10) ); debug!("Joined task"); } diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index f32fbdd04e6..c5179652fdc 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -12,13 +12,13 @@ #[legacy_modes]; pub fn main() { - let po = pipes::PortSet(); + let po = comm::PortSet(); // Spawn 10 tasks each sending us back one int. let mut i = 10; while (i > 0) { log(debug, i); - let (p, ch) = pipes::stream(); + let (p, ch) = comm::stream(); po.add(p); task::spawn({let i = i; || child(i, ch)}); i = i - 1; @@ -37,7 +37,7 @@ pub fn main() { debug!("main thread exiting"); } -fn child(x: int, ch: pipes::Chan<int>) { +fn child(x: int, ch: comm::Chan<int>) { log(debug, x); ch.send(x); } diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs index 957066005ac..525cafef169 100644 --- a/src/test/run-pass/task-comm-15.rs +++ b/src/test/run-pass/task-comm-15.rs @@ -14,7 +14,7 @@ extern mod std; -fn start(c: pipes::Chan<int>, i0: int) { +fn start(c: comm::Chan<int>, i0: int) { let mut i = i0; while i > 0 { c.send(0); @@ -27,7 +27,7 @@ pub fn main() { // is likely to terminate before the child completes, so from // the child's point of view the receiver may die. We should // drop messages on the floor in this case, and not crash! - let (p, ch) = pipes::stream(); + let (p, ch) = comm::stream(); task::spawn(|| start(ch, 10)); p.recv(); } diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs index 648a54d190f..e2ac5623db3 100644 --- a/src/test/run-pass/task-comm-16.rs +++ b/src/test/run-pass/task-comm-16.rs @@ -10,16 +10,11 @@ // except according to those terms. -use pipes::send; -use pipes::Port; -use pipes::recv; -use pipes::Chan; - // Tests of ports and channels on various types fn test_rec() { struct R {val0: int, val1: u8, val2: char} - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); let r0: R = R {val0: 0, val1: 1u8, val2: '2'}; ch.send(r0); let mut r1: R; @@ -30,7 +25,7 @@ fn test_rec() { } fn test_vec() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); let v0: ~[int] = ~[0, 1, 2]; ch.send(v0); let v1 = po.recv(); @@ -40,7 +35,7 @@ fn test_vec() { } fn test_str() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); let s0 = ~"test"; ch.send(s0); let s1 = po.recv(); @@ -84,7 +79,7 @@ impl cmp::Eq for t { } fn test_tag() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); ch.send(tag1); ch.send(tag2(10)); ch.send(tag3(10, 11u8, 'A')); @@ -98,8 +93,8 @@ fn test_tag() { } fn test_chan() { - let (po, ch) = pipes::stream(); - let (po0, ch0) = pipes::stream(); + let (po, ch) = comm::stream(); + let (po0, ch0) = comm::stream(); ch.send(ch0); let ch1 = po.recv(); // Does the transmitted channel still work? diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 372a0ea434b..9bbe20c2e13 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -12,9 +12,7 @@ #[legacy_modes]; extern mod std; -use pipes::Chan; -use pipes::send; -use pipes::recv; +use core::comm::Chan; pub fn main() { debug!("===== WITHOUT THREADS ====="); test00(); } @@ -35,7 +33,7 @@ fn test00() { debug!("Creating tasks"); - let po = pipes::PortSet(); + let po = comm::PortSet(); let mut i: int = 0; diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs index d7997d93248..dc4dc27229c 100644 --- a/src/test/run-pass/task-comm-4.rs +++ b/src/test/run-pass/task-comm-4.rs @@ -8,14 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use pipes::send; - pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); c.send(1); c.send(2); c.send(3); diff --git a/src/test/run-pass/task-comm-5.rs b/src/test/run-pass/task-comm-5.rs index f8f19d804c8..0256c1cbb87 100644 --- a/src/test/run-pass/task-comm-5.rs +++ b/src/test/run-pass/task-comm-5.rs @@ -15,7 +15,7 @@ pub fn main() { test00(); } fn test00() { let r: int = 0; let mut sum: int = 0; - let (p, c) = pipes::stream(); + let (p, c) = comm::stream(); let number_of_messages: int = 1000; let mut i: int = 0; while i < number_of_messages { c.send(i + 0); i += 1; } diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index 5d19075a71e..c18090ea45f 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -8,16 +8,14 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use pipes::send; -use pipes::Chan; -use pipes::recv; +use core::comm::Chan; pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = pipes::PortSet(); + let p = comm::PortSet(); let c0 = p.chan(); let c1 = p.chan(); let c2 = p.chan(); diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index 481df2d1d52..21eb93e8d09 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -15,7 +15,7 @@ extern mod std; pub fn main() { test00(); } -fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) { +fn test00_start(c: comm::Chan<int>, start: int, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(start + i); i += 1; } } @@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) { fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = pipes::PortSet(); + let p = comm::PortSet(); let number_of_messages: int = 10; let c = p.chan(); diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index d3ed48f7575..75fcd12c312 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -15,7 +15,7 @@ extern mod std; pub fn main() { test00(); } -fn test00_start(c: pipes::Chan<int>, number_of_messages: int) { +fn test00_start(c: comm::Chan<int>, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(i + 0); i += 1; } } @@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, number_of_messages: int) { fn test00() { let r: int = 0; let mut sum: int = 0; - let p = pipes::PortSet(); + let p = comm::PortSet(); let number_of_messages: int = 10; let ch = p.chan(); diff --git a/src/test/run-pass/task-comm-chan-nil.rs b/src/test/run-pass/task-comm-chan-nil.rs index cb62e2f87ee..db2ad2de61b 100644 --- a/src/test/run-pass/task-comm-chan-nil.rs +++ b/src/test/run-pass/task-comm-chan-nil.rs @@ -16,7 +16,7 @@ extern mod std; // any size, but rustc currently can because they do have size. Whether // or not this is desirable I don't know, but here's a regression test. pub fn main() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); ch.send(()); let n: () = po.recv(); assert (n == ()); diff --git a/src/test/run-pass/task-killjoin-rsrc.rs b/src/test/run-pass/task-killjoin-rsrc.rs index ca60dfd3de0..b90c39ab34e 100644 --- a/src/test/run-pass/task-killjoin-rsrc.rs +++ b/src/test/run-pass/task-killjoin-rsrc.rs @@ -13,7 +13,7 @@ // A port of task-killjoin to use a class with a dtor to manage // the join. -use core::pipes::*; +use core::comm::*; struct notify { ch: Chan<bool>, v: @mut bool, diff --git a/src/test/run-pass/task-spawn-move-and-copy.rs b/src/test/run-pass/task-spawn-move-and-copy.rs index d9b06627c80..805f8e8b1e2 100644 --- a/src/test/run-pass/task-spawn-move-and-copy.rs +++ b/src/test/run-pass/task-spawn-move-and-copy.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; pub fn main() { let (p, ch) = stream::<uint>(); diff --git a/src/test/run-pass/trivial-message.rs b/src/test/run-pass/trivial-message.rs index 21524d3fc54..7800ebd7310 100644 --- a/src/test/run-pass/trivial-message.rs +++ b/src/test/run-pass/trivial-message.rs @@ -8,14 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use pipes::{Port, Chan}; - /* This is about the simplest program that can successfully send a message. */ pub fn main() { - let (po, ch) = pipes::stream(); + let (po, ch) = comm::stream(); ch.send(42); let r = po.recv(); log(error, r); diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs index a5398e7407b..7be6907a0c7 100644 --- a/src/test/run-pass/unique-send-2.rs +++ b/src/test/run-pass/unique-send-2.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; fn child(c: &SharedChan<~uint>, i: uint) { c.send(~i); diff --git a/src/test/run-pass/unique-send.rs b/src/test/run-pass/unique-send.rs index 57b345c2d25..75fc71441f8 100644 --- a/src/test/run-pass/unique-send.rs +++ b/src/test/run-pass/unique-send.rs @@ -8,7 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use core::pipes::*; +use core::comm::*; pub fn main() { let (p, c) = stream(); diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 93f1c7b5b45..2693a8d3942 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -11,7 +11,7 @@ // xfail-win32 extern mod std; -use core::pipes::*; +use core::comm::*; struct complainer { c: SharedChan<bool>, | 
