diff options
| author | bors <bors@rust-lang.org> | 2014-03-13 14:06:37 -0700 |
|---|---|---|
| committer | bors <bors@rust-lang.org> | 2014-03-13 14:06:37 -0700 |
| commit | b4d324334cb48198c27d782002d75eba14a6abde (patch) | |
| tree | 950d8daa5e6305090bdd69625bb18ead48471865 /src/libstd | |
| parent | 6ff3c9995e63b63c16d13739a0fc2d321f95410e (diff) | |
| parent | 78580651131c9daacd7e5e4669af819cdd719f09 (diff) | |
| download | rust-b4d324334cb48198c27d782002d75eba14a6abde.tar.gz rust-b4d324334cb48198c27d782002d75eba14a6abde.zip | |
auto merge of #12815 : alexcrichton/rust/chan-rename, r=brson
* Chan<T> => Sender<T> * Port<T> => Receiver<T> * Chan::new() => channel() * constructor returns (Sender, Receiver) instead of (Receiver, Sender) * local variables named `port` renamed to `rx` * local variables named `chan` renamed to `tx` Closes #11765
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm/mod.rs | 612 | ||||
| -rw-r--r-- | src/libstd/comm/oneshot.rs | 14 | ||||
| -rw-r--r-- | src/libstd/comm/select.rs | 429 | ||||
| -rw-r--r-- | src/libstd/comm/stream.rs | 14 | ||||
| -rw-r--r-- | src/libstd/io/comm_adapters.rs | 77 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 2 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 137 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 92 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 46 | ||||
| -rw-r--r-- | src/libstd/io/pipe.rs | 6 | ||||
| -rw-r--r-- | src/libstd/io/process.rs | 10 | ||||
| -rw-r--r-- | src/libstd/io/signal.rs | 34 | ||||
| -rw-r--r-- | src/libstd/io/stdio.rs | 12 | ||||
| -rw-r--r-- | src/libstd/io/test.rs | 6 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 112 | ||||
| -rw-r--r-- | src/libstd/macros.rs | 20 | ||||
| -rw-r--r-- | src/libstd/prelude.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 8 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 16 | ||||
| -rw-r--r-- | src/libstd/sync/mpmc_bounded_queue.rs | 20 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 10 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 6 | ||||
| -rw-r--r-- | src/libstd/task.rs | 83 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 6 |
24 files changed, 865 insertions, 909 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 7345193a751..e25571dd246 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -8,38 +8,38 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! Communication primitives for concurrent tasks (`Chan` and `Port` types) +//! Communication primitives for concurrent tasks //! //! Rust makes it very difficult to share data among tasks to prevent race //! conditions and to improve parallelism, but there is often a need for //! communication between concurrent tasks. The primitives defined in this //! module are the building blocks for synchronization in rust. //! -//! This module currently provides two types: +//! This module provides message-based communication over channels, concretely +//! defined as two types: //! -//! * `Chan` -//! * `Port` +//! * `Sender` +//! * `Receiver` //! -//! `Chan` is used to send data to a `Port`. A `Chan` is clone-able such that -//! many tasks can send simultaneously to one receiving port. These -//! communication primitives are *task blocking*, not *thread blocking*. This -//! means that if one task is blocked on a channel, other tasks can continue to -//! make progress. +//! A `Sender` is used to send data to a `Receiver`. A `Sender` is clone-able +//! such that many tasks can send simultaneously to one receiver. These +//! channels are *task blocking*, not *thread blocking*. This means that if one +//! task is blocked on a channel, other tasks can continue to make progress. //! //! Rust channels can be used as if they have an infinite internal buffer. What -//! this means is that the `send` operation will never block. `Port`s, on the -//! other hand, will block the task if there is no data to be received. +//! this means is that the `send` operation will never block. `Receiver`s, on +//! the other hand, will block the task if there is no data to be received. //! //! ## Failure Propagation //! //! In addition to being a core primitive for communicating in rust, channels -//! and ports are the points at which failure is propagated among tasks. -//! Whenever the one half of channel is closed, the other half will have its -//! next operation `fail!`. The purpose of this is to allow propagation of -//! failure among tasks that are linked to one another via channels. +//! are the points at which failure is propagated among tasks. Whenever the one +//! half of channel is closed, the other half will have its next operation +//! `fail!`. The purpose of this is to allow propagation of failure among tasks +//! that are linked to one another via channels. //! -//! There are methods on both of `Chan` and `Port` to perform their respective -//! operations without failing, however. +//! There are methods on both of `Sender` and `Receiver` to perform their +//! respective operations without failing, however. //! //! ## Outside the Runtime //! @@ -58,31 +58,31 @@ //! //! ```rust,should_fail //! // Create a simple streaming channel -//! let (port, chan) = Chan::new(); +//! let (tx, rx) = channel(); //! spawn(proc() { -//! chan.send(10); +//! tx.send(10); //! }); -//! assert_eq!(port.recv(), 10); +//! assert_eq!(rx.recv(), 10); //! //! // Create a shared channel which can be sent along from many tasks -//! let (port, chan) = Chan::new(); +//! let (tx, rx) = channel(); //! for i in range(0, 10) { -//! let chan = chan.clone(); +//! let tx = tx.clone(); //! spawn(proc() { -//! chan.send(i); +//! tx.send(i); //! }) //! } //! //! for _ in range(0, 10) { -//! let j = port.recv(); +//! let j = rx.recv(); //! assert!(0 <= j && j < 10); //! } //! //! // The call to recv() will fail!() because the channel has already hung //! // up (or been deallocated) -//! let (port, chan) = Chan::<int>::new(); -//! drop(chan); -//! port.recv(); +//! let (tx, rx) = channel::<int>(); +//! drop(tx); +//! rx.recv(); //! ``` // A description of how Rust's channel implementation works @@ -118,7 +118,7 @@ // // ## Concurrent queues // -// The basic idea of Rust's Chan/Port types is that send() never blocks, but +// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but // recv() obviously blocks. This means that under the hood there must be some // shared and concurrent queue holding all of the actual data. // @@ -177,10 +177,9 @@ // // ### The internal atomic counter // -// Every channel/port/shared channel have a shared counter with their -// counterparts to keep track of the size of the queue. This counter is used to -// abort descheduling by the receiver and to know when to wake up on the sending -// side. +// Every channel has a shared counter with each half to keep track of the size +// of the queue. This counter is used to abort descheduling by the receiver and +// to know when to wake up on the sending side. // // As seen in the pseudocode, senders will increment this count and receivers // will decrement the count. The theory behind this is that if a sender sees a @@ -195,7 +194,7 @@ // it was actually appropriate to wake up a receiver. // // Instead, the "steal count" is kept track of separately (not atomically -// because it's only used by ports), and then the decrement() call when +// because it's only used by receivers), and then the decrement() call when // descheduling will lump in all of the recent steals into one large decrement. // // The implication of this is that if a sender sees a -1 count, then there's @@ -269,9 +268,9 @@ macro_rules! test ( $($a)* #[test] fn uv() { f() } $($a)* #[test] fn native() { use native; - let (p, c) = Chan::new(); - native::task::spawn(proc() { c.send(f()) }); - p.recv(); + let (tx, rx) = channel(); + native::task::spawn(proc() { tx.send(f()) }); + rx.recv(); } } ) @@ -288,23 +287,23 @@ static RESCHED_FREQ: int = 256; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task -pub struct Port<T> { +pub struct Receiver<T> { priv inner: Flavor<T>, priv receives: Cell<uint>, // can't share in an arc priv marker: marker::NoFreeze, } -/// An iterator over messages received on a port, this iterator will block +/// An iterator over messages on a receiver, this iterator will block /// whenever `next` is called, waiting for a new message, and `None` will be /// returned when the corresponding channel has hung up. pub struct Messages<'a, T> { - priv port: &'a Port<T> + priv rx: &'a Receiver<T> } /// The sending-half of Rust's channel type. This half can only be owned by one /// task -pub struct Chan<T> { +pub struct Sender<T> { priv inner: Flavor<T>, priv sends: Cell<uint>, // can't share in an arc @@ -331,30 +330,30 @@ enum Flavor<T> { Shared(UnsafeArc<shared::Packet<T>>), } -impl<T: Send> Chan<T> { - /// Creates a new port/channel pair. All data send on the channel returned - /// will become available on the port as well. See the documentation of - /// `Port` and `Chan` to see what's possible with them. - pub fn new() -> (Port<T>, Chan<T>) { - let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Port::my_new(Oneshot(a)), Chan::my_new(Oneshot(b))) - } +/// Creates a new channel, returning the sender/receiver halves. All data sent +/// on the sender will become available on the receiver. See the documentation +/// of `Receiver` and `Sender` to see what's possible with them. +pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { + let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); + (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) +} - fn my_new(inner: Flavor<T>) -> Chan<T> { - Chan { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze } +impl<T: Send> Sender<T> { + fn my_new(inner: Flavor<T>) -> Sender<T> { + Sender { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze } } /// Sends a value along this channel to be received by the corresponding - /// port. + /// receiver. /// /// Rust channels are infinitely buffered so this method will never block. /// /// # Failure /// /// This function will fail if the other end of the channel has hung up. - /// This means that if the corresponding port has fallen out of scope, this - /// function will trigger a fail message saying that a message is being sent - /// on a closed channel. + /// This means that if the corresponding receiver has fallen out of scope, + /// this function will trigger a fail message saying that a message is + /// being sent on a closed channel. /// /// Note that if this function does *not* fail, it does not mean that the /// data will be successfully received. All sends are placed into a queue, @@ -372,13 +371,13 @@ impl<T: Send> Chan<T> { /// Attempts to send a value on this channel, returning whether it was /// successfully sent. /// - /// A successful send occurs when it is determined that the other end of the - /// channel has not hung up already. An unsuccessful send would be one where - /// the corresponding port has already been deallocated. Note that a return - /// value of `false` means that the data will never be received, but a - /// return value of `true` does *not* mean that the data will be received. - /// It is possible for the corresponding port to hang up immediately after - /// this function returns `true`. + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of `false` means that the data will never be + /// received, but a return value of `true` does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns `true`. /// /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. @@ -406,7 +405,7 @@ impl<T: Send> Chan<T> { return (*p).send(t); } else { let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Port::my_new(Stream(b))) { + match (*p).upgrade(Receiver::my_new(Stream(b))) { oneshot::UpSuccess => { (*a.get()).send(t); (a, true) @@ -426,48 +425,48 @@ impl<T: Send> Chan<T> { }; unsafe { - let mut tmp = Chan::my_new(Stream(new_inner)); + let mut tmp = Sender::my_new(Stream(new_inner)); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } return ret; } } -impl<T: Send> Clone for Chan<T> { - fn clone(&self) -> Chan<T> { +impl<T: Send> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { let (packet, sleeper) = match self.inner { Oneshot(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), oneshot::UpWoke(task) => (b, Some(task)) } } Stream(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { stream::UpSuccess | stream::UpDisconnected => (b, None), stream::UpWoke(task) => (b, Some(task)), } } Shared(ref p) => { unsafe { (*p.get()).clone_chan(); } - return Chan::my_new(Shared(p.clone())); + return Sender::my_new(Shared(p.clone())); } }; unsafe { (*packet.get()).inherit_blocker(sleeper); - let mut tmp = Chan::my_new(Shared(packet.clone())); + let mut tmp = Sender::my_new(Shared(packet.clone())); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } - Chan::my_new(Shared(packet)) + Sender::my_new(Shared(packet)) } } #[unsafe_destructor] -impl<T: Send> Drop for Chan<T> { +impl<T: Send> Drop for Sender<T> { fn drop(&mut self) { match self.inner { Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, @@ -477,16 +476,16 @@ impl<T: Send> Drop for Chan<T> { } } -impl<T: Send> Port<T> { - fn my_new(inner: Flavor<T>) -> Port<T> { - Port { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze } +impl<T: Send> Receiver<T> { + fn my_new(inner: Flavor<T>) -> Receiver<T> { + Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze } } - /// Blocks waiting for a value on this port + /// Blocks waiting for a value on this receiver /// /// This function will block if necessary to wait for a corresponding send - /// on the channel from its paired `Chan` structure. This port will be woken - /// up when data is ready, and the data will be returned. + /// on the channel from its paired `Sender` structure. This receiver will + /// be woken up when data is ready, and the data will be returned. /// /// # Failure /// @@ -500,7 +499,7 @@ impl<T: Send> Port<T> { /// when the other end hangs up /// /// * If blocking is not desired, then the `try_recv` method will attempt to - /// peek at a value on this port. + /// peek at a value on this receiver. pub fn recv(&self) -> T { match self.recv_opt() { Some(t) => t, @@ -508,14 +507,14 @@ impl<T: Send> Port<T> { } } - /// Attempts to return a pending value on this port without blocking + /// Attempts to return a pending value on this receiver without blocking /// /// This method will never block the caller in order to wait for data to /// become available. Instead, this will always return immediately with a /// possible option of pending data on the channel. /// /// This is useful for a flavor of "optimistic check" before deciding to - /// block on a port. + /// block on a receiver. /// /// This function cannot fail. pub fn try_recv(&self) -> TryRecvResult<T> { @@ -537,7 +536,7 @@ impl<T: Send> Port<T> { Ok(t) => return Data(t), Err(oneshot::Empty) => return Empty, Err(oneshot::Disconnected) => return Disconnected, - Err(oneshot::Upgraded(port)) => port, + Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { @@ -545,7 +544,7 @@ impl<T: Send> Port<T> { Ok(t) => return Data(t), Err(stream::Empty) => return Empty, Err(stream::Disconnected) => return Disconnected, - Err(stream::Upgraded(port)) => port, + Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { @@ -563,18 +562,18 @@ impl<T: Send> Port<T> { } } - /// Attempt to wait for a value on this port, but does not fail if the + /// Attempt to wait for a value on this receiver, but does not fail if the /// corresponding channel has hung up. /// /// This implementation of iterators for ports will always block if there is - /// not data available on the port, but it will not fail in the case that - /// the channel has been deallocated. + /// not data available on the receiver, but it will not fail in the case + /// that the channel has been deallocated. /// /// In other words, this function has the same semantics as the `recv` /// method except for the failure aspect. /// /// If the channel has hung up, then `None` is returned. Otherwise `Some` of - /// the value found on the port is returned. + /// the value found on the receiver is returned. pub fn recv_opt(&self) -> Option<T> { loop { let mut new_port = match self.inner { @@ -583,7 +582,7 @@ impl<T: Send> Port<T> { Ok(t) => return Some(t), Err(oneshot::Empty) => return unreachable!(), Err(oneshot::Disconnected) => return None, - Err(oneshot::Upgraded(port)) => port, + Err(oneshot::Upgraded(rx)) => rx, } } Stream(ref p) => { @@ -591,7 +590,7 @@ impl<T: Send> Port<T> { Ok(t) => return Some(t), Err(stream::Empty) => return unreachable!(), Err(stream::Disconnected) => return None, - Err(stream::Upgraded(port)) => port, + Err(stream::Upgraded(rx)) => rx, } } Shared(ref p) => { @@ -612,11 +611,11 @@ impl<T: Send> Port<T> { /// Returns an iterator which will block waiting for messages, but never /// `fail!`. It will return `None` when the channel has hung up. pub fn iter<'a>(&'a self) -> Messages<'a, T> { - Messages { port: self } + Messages { rx: self } } } -impl<T: Send> select::Packet for Port<T> { +impl<T: Send> select::Packet for Receiver<T> { fn can_recv(&self) -> bool { loop { let mut new_port = match self.inner { @@ -650,14 +649,14 @@ impl<T: Send> select::Packet for Port<T> { match unsafe { (*p.get()).start_selection(task) } { oneshot::SelSuccess => return Ok(()), oneshot::SelCanceled(task) => return Err(task), - oneshot::SelUpgraded(t, port) => (t, port), + oneshot::SelUpgraded(t, rx) => (t, rx), } } Stream(ref p) => { match unsafe { (*p.get()).start_selection(task) } { stream::SelSuccess => return Ok(()), stream::SelCanceled(task) => return Err(task), - stream::SelUpgraded(t, port) => (t, port), + stream::SelUpgraded(t, rx) => (t, rx), } } Shared(ref p) => { @@ -695,11 +694,11 @@ impl<T: Send> select::Packet for Port<T> { } impl<'a, T: Send> Iterator<T> for Messages<'a, T> { - fn next(&mut self) -> Option<T> { self.port.recv_opt() } + fn next(&mut self) -> Option<T> { self.rx.recv_opt() } } #[unsafe_destructor] -impl<T: Send> Drop for Port<T> { +impl<T: Send> Drop for Receiver<T> { fn drop(&mut self) { match self.inner { Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, @@ -725,332 +724,333 @@ mod test { } test!(fn smoke() { - let (p, c) = Chan::new(); - c.send(1); - assert_eq!(p.recv(), 1); + let (tx, rx) = channel(); + tx.send(1); + assert_eq!(rx.recv(), 1); }) test!(fn drop_full() { - let (_p, c) = Chan::new(); - c.send(~1); + let (tx, _rx) = channel(); + tx.send(~1); }) test!(fn drop_full_shared() { - let (_p, c) = Chan::new(); - c.send(~1); + let (tx, _rx) = channel(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(~1); }) test!(fn smoke_shared() { - let (p, c) = Chan::new(); - c.send(1); - assert_eq!(p.recv(), 1); - let c = c.clone(); - c.send(1); - assert_eq!(p.recv(), 1); + let (tx, rx) = channel(); + tx.send(1); + assert_eq!(rx.recv(), 1); + let tx = tx.clone(); + tx.send(1); + assert_eq!(rx.recv(), 1); }) test!(fn smoke_threads() { - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - c.send(1); + tx.send(1); }); - assert_eq!(p.recv(), 1); + assert_eq!(rx.recv(), 1); }) test!(fn smoke_port_gone() { - let (p, c) = Chan::new(); - drop(p); - c.send(1); + let (tx, rx) = channel(); + drop(rx); + tx.send(1); } #[should_fail]) test!(fn smoke_shared_port_gone() { - let (p, c) = Chan::new(); - drop(p); - c.send(1); + let (tx, rx) = channel(); + drop(rx); + tx.send(1); } #[should_fail]) test!(fn smoke_shared_port_gone2() { - let (p, c) = Chan::new(); - drop(p); - let c2 = c.clone(); - drop(c); - c2.send(1); + let (tx, rx) = channel(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + tx2.send(1); } #[should_fail]) test!(fn port_gone_concurrent() { - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - p.recv(); + rx.recv(); }); - loop { c.send(1) } + loop { tx.send(1) } } #[should_fail]) test!(fn port_gone_concurrent_shared() { - let (p, c) = Chan::new(); - let c1 = c.clone(); + let (tx, rx) = channel(); + let tx2 = tx.clone(); spawn(proc() { - p.recv(); + rx.recv(); }); loop { - c.send(1); - c1.send(1); + tx.send(1); + tx2.send(1); } } #[should_fail]) test!(fn smoke_chan_gone() { - let (p, c) = Chan::<int>::new(); - drop(c); - p.recv(); + let (tx, rx) = channel::<int>(); + drop(tx); + rx.recv(); } #[should_fail]) test!(fn smoke_chan_gone_shared() { - let (p, c) = Chan::<()>::new(); - let c2 = c.clone(); - drop(c); - drop(c2); - p.recv(); + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + rx.recv(); } #[should_fail]) test!(fn chan_gone_concurrent() { - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - c.send(1); - c.send(1); + tx.send(1); + tx.send(1); }); - loop { p.recv(); } + loop { rx.recv(); } } #[should_fail]) test!(fn stress() { - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - for _ in range(0, 10000) { c.send(1); } + for _ in range(0, 10000) { tx.send(1); } }); for _ in range(0, 10000) { - assert_eq!(p.recv(), 1); + assert_eq!(rx.recv(), 1); } }) test!(fn stress_shared() { static AMT: uint = 10000; static NTHREADS: uint = 8; - let (p, c) = Chan::<int>::new(); - let (p1, c1) = Chan::new(); + let (tx, rx) = channel::<int>(); + let (dtx, drx) = channel::<()>(); spawn(proc() { for _ in range(0, AMT * NTHREADS) { - assert_eq!(p.recv(), 1); + assert_eq!(rx.recv(), 1); } - match p.try_recv() { + match rx.try_recv() { Data(..) => fail!(), _ => {} } - c1.send(()); + dtx.send(()); }); for _ in range(0, NTHREADS) { - let c = c.clone(); + let tx = tx.clone(); spawn(proc() { - for _ in range(0, AMT) { c.send(1); } + for _ in range(0, AMT) { tx.send(1); } }); } - p1.recv(); + drop(tx); + drx.recv(); }) #[test] fn send_from_outside_runtime() { - let (p, c) = Chan::<int>::new(); - let (p1, c1) = Chan::new(); - let (port, chan) = Chan::new(); - let chan2 = chan.clone(); + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); spawn(proc() { - c1.send(()); + tx1.send(()); for _ in range(0, 40) { - assert_eq!(p.recv(), 1); + assert_eq!(rx2.recv(), 1); } - chan2.send(()); + tx3.send(()); }); - p1.recv(); + rx1.recv(); native::task::spawn(proc() { for _ in range(0, 40) { - c.send(1); + tx2.send(1); } - chan.send(()); + tx4.send(()); }); - port.recv(); - port.recv(); + rx3.recv(); + rx3.recv(); } #[test] fn recv_from_outside_runtime() { - let (p, c) = Chan::<int>::new(); - let (dp, dc) = Chan::new(); + let (tx, rx) = channel::<int>(); + let (dtx, drx) = channel(); native::task::spawn(proc() { for _ in range(0, 40) { - assert_eq!(p.recv(), 1); + assert_eq!(rx.recv(), 1); } - dc.send(()); + dtx.send(()); }); for _ in range(0, 40) { - c.send(1); + tx.send(1); } - dp.recv(); + drx.recv(); } #[test] fn no_runtime() { - let (p1, c1) = Chan::<int>::new(); - let (p2, c2) = Chan::<int>::new(); - let (port, chan) = Chan::new(); - let chan2 = chan.clone(); + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); native::task::spawn(proc() { - assert_eq!(p1.recv(), 1); - c2.send(2); - chan2.send(()); + assert_eq!(rx1.recv(), 1); + tx2.send(2); + tx4.send(()); }); native::task::spawn(proc() { - c1.send(1); - assert_eq!(p2.recv(), 2); - chan.send(()); + tx1.send(1); + assert_eq!(rx2.recv(), 2); + tx3.send(()); }); - port.recv(); - port.recv(); + rx3.recv(); + rx3.recv(); } test!(fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending - let (port, _chan) = Chan::<int>::new(); - { let _p = port; } + let (_tx, rx) = channel::<int>(); + drop(rx); }) test!(fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending - let (_port, chan) = Chan::<int>::new(); - { let _c = chan; } + let (tx, _rx) = channel::<int>(); + drop(tx); }) test!(fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed - let (port, chan) = Chan::<~int>::new(); - { let _p = port; } - chan.send(~0); + let (tx, rx) = channel::<~int>(); + drop(rx); + tx.send(~0); } #[should_fail]) test!(fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will fail let res = task::try(proc() { - let (port, chan) = Chan::<~int>::new(); - { let _c = chan; } - port.recv(); + let (tx, rx) = channel::<int>(); + drop(tx); + rx.recv(); }); // What is our res? assert!(res.is_err()); }) test!(fn oneshot_single_thread_send_then_recv() { - let (port, chan) = Chan::<~int>::new(); - chan.send(~10); - assert!(port.recv() == ~10); + let (tx, rx) = channel::<~int>(); + tx.send(~10); + assert!(rx.recv() == ~10); }) test!(fn oneshot_single_thread_try_send_open() { - let (port, chan) = Chan::<int>::new(); - assert!(chan.try_send(10)); - assert!(port.recv() == 10); + let (tx, rx) = channel::<int>(); + assert!(tx.try_send(10)); + assert!(rx.recv() == 10); }) test!(fn oneshot_single_thread_try_send_closed() { - let (port, chan) = Chan::<int>::new(); - { let _p = port; } - assert!(!chan.try_send(10)); + let (tx, rx) = channel::<int>(); + drop(rx); + assert!(!tx.try_send(10)); }) test!(fn oneshot_single_thread_try_recv_open() { - let (port, chan) = Chan::<int>::new(); - chan.send(10); - assert!(port.recv_opt() == Some(10)); + let (tx, rx) = channel::<int>(); + tx.send(10); + assert!(rx.recv_opt() == Some(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { - let (port, chan) = Chan::<int>::new(); - { let _c = chan; } - assert!(port.recv_opt() == None); + let (tx, rx) = channel::<int>(); + drop(tx); + assert!(rx.recv_opt() == None); }) test!(fn oneshot_single_thread_peek_data() { - let (port, chan) = Chan::<int>::new(); - assert_eq!(port.try_recv(), Empty) - chan.send(10); - assert_eq!(port.try_recv(), Data(10)); + let (tx, rx) = channel::<int>(); + assert_eq!(rx.try_recv(), Empty) + tx.send(10); + assert_eq!(rx.try_recv(), Data(10)); }) test!(fn oneshot_single_thread_peek_close() { - let (port, chan) = Chan::<int>::new(); - { let _c = chan; } - assert_eq!(port.try_recv(), Disconnected); - assert_eq!(port.try_recv(), Disconnected); + let (tx, rx) = channel::<int>(); + drop(tx); + assert_eq!(rx.try_recv(), Disconnected); + assert_eq!(rx.try_recv(), Disconnected); }) test!(fn oneshot_single_thread_peek_open() { - let (port, _chan) = Chan::<int>::new(); - assert_eq!(port.try_recv(), Empty); + let (_tx, rx) = channel::<int>(); + assert_eq!(rx.try_recv(), Empty); }) test!(fn oneshot_multi_task_recv_then_send() { - let (port, chan) = Chan::<~int>::new(); + let (tx, rx) = channel::<~int>(); spawn(proc() { - assert!(port.recv() == ~10); + assert!(rx.recv() == ~10); }); - chan.send(~10); + tx.send(~10); }) test!(fn oneshot_multi_task_recv_then_close() { - let (port, chan) = Chan::<~int>::new(); + let (tx, rx) = channel::<~int>(); spawn(proc() { - let _chan = chan; + drop(tx); }); let res = task::try(proc() { - assert!(port.recv() == ~10); + assert!(rx.recv() == ~10); }); assert!(res.is_err()); }) test!(fn oneshot_multi_thread_close_stress() { for _ in range(0, stress_factor()) { - let (port, chan) = Chan::<int>::new(); + let (tx, rx) = channel::<int>(); spawn(proc() { - let _p = port; + drop(rx); }); - let _chan = chan; + drop(tx); } }) test!(fn oneshot_multi_thread_send_close_stress() { for _ in range(0, stress_factor()) { - let (port, chan) = Chan::<int>::new(); + let (tx, rx) = channel::<int>(); spawn(proc() { - let _p = port; + drop(rx); }); let _ = task::try(proc() { - chan.send(1); + tx.send(1); }); } }) test!(fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { - let (port, chan) = Chan::<int>::new(); + let (tx, rx) = channel::<int>(); spawn(proc() { - let port = port; let res = task::try(proc() { - port.recv(); + rx.recv(); }); assert!(res.is_err()); }); spawn(proc() { - let chan = chan; spawn(proc() { - let _chan = chan; + drop(tx); }); }); } @@ -1058,38 +1058,38 @@ mod test { test!(fn oneshot_multi_thread_send_recv_stress() { for _ in range(0, stress_factor()) { - let (port, chan) = Chan::<~int>::new(); + let (tx, rx) = channel(); spawn(proc() { - chan.send(~10); + tx.send(~10); }); spawn(proc() { - assert!(port.recv() == ~10); + assert!(rx.recv() == ~10); }); } }) test!(fn stream_send_recv_stress() { for _ in range(0, stress_factor()) { - let (port, chan) = Chan::<~int>::new(); + let (tx, rx) = channel(); - send(chan, 0); - recv(port, 0); + send(tx, 0); + recv(rx, 0); - fn send(chan: Chan<~int>, i: int) { + fn send(tx: Sender<~int>, i: int) { if i == 10 { return } spawn(proc() { - chan.send(~i); - send(chan, i + 1); + tx.send(~i); + send(tx, i + 1); }); } - fn recv(port: Port<~int>, i: int) { + fn recv(rx: Receiver<~int>, i: int) { if i == 10 { return } spawn(proc() { - assert!(port.recv() == ~i); - recv(port, i + 1); + assert!(rx.recv() == ~i); + recv(rx, i + 1); }); } } @@ -1097,125 +1097,125 @@ mod test { test!(fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context - let (port, chan) = Chan::new(); - for _ in range(0, 10000) { chan.send(()); } - for _ in range(0, 10000) { port.recv(); } + let (tx, rx) = channel(); + for _ in range(0, 10000) { tx.send(()); } + for _ in range(0, 10000) { rx.recv(); } }) test!(fn shared_chan_stress() { - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); let total = stress_factor() + 100; for _ in range(0, total) { - let chan_clone = chan.clone(); + let tx = tx.clone(); spawn(proc() { - chan_clone.send(()); + tx.send(()); }); } for _ in range(0, total) { - port.recv(); + rx.recv(); } }) test!(fn test_nested_recv_iter() { - let (port, chan) = Chan::<int>::new(); - let (total_port, total_chan) = Chan::<int>::new(); + let (tx, rx) = channel::<int>(); + let (total_tx, total_rx) = channel::<int>(); spawn(proc() { let mut acc = 0; - for x in port.iter() { + for x in rx.iter() { acc += x; } - total_chan.send(acc); + total_tx.send(acc); }); - chan.send(3); - chan.send(1); - chan.send(2); - drop(chan); - assert_eq!(total_port.recv(), 6); + tx.send(3); + tx.send(1); + tx.send(2); + drop(tx); + assert_eq!(total_rx.recv(), 6); }) test!(fn test_recv_iter_break() { - let (port, chan) = Chan::<int>::new(); - let (count_port, count_chan) = Chan::<int>::new(); + let (tx, rx) = channel::<int>(); + let (count_tx, count_rx) = channel(); spawn(proc() { let mut count = 0; - for x in port.iter() { + for x in rx.iter() { if count >= 3 { break; } else { count += x; } } - count_chan.send(count); + count_tx.send(count); }); - chan.send(2); - chan.send(2); - chan.send(2); - chan.try_send(2); - drop(chan); - assert_eq!(count_port.recv(), 4); + tx.send(2); + tx.send(2); + tx.send(2); + tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv(), 4); }) test!(fn try_recv_states() { - let (p, c) = Chan::<int>::new(); - let (p1, c1) = Chan::<()>::new(); - let (p2, c2) = Chan::<()>::new(); + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { - p1.recv(); - c.send(1); - c2.send(()); - p1.recv(); - drop(c); - c2.send(()); + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); + drop(tx1); + tx3.send(()); }); - assert_eq!(p.try_recv(), Empty); - c1.send(()); - p2.recv(); - assert_eq!(p.try_recv(), Data(1)); - assert_eq!(p.try_recv(), Empty); - c1.send(()); - p2.recv(); - assert_eq!(p.try_recv(), Disconnected); + assert_eq!(rx1.try_recv(), Empty); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Data(1)); + assert_eq!(rx1.try_recv(), Empty); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Disconnected); }) - // This bug used to end up in a livelock inside of the Port destructor - // because the internal state of the Shared port was corrupted + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted test!(fn destroy_upgraded_shared_port_when_sender_still_active() { - let (p, c) = Chan::new(); - let (p1, c2) = Chan::new(); + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { - p.recv(); // wait on a oneshot port - drop(p); // destroy a shared port - c2.send(()); + rx.recv(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()); }); // make sure the other task has gone to sleep for _ in range(0, 5000) { task::deschedule(); } // upgrade to a shared chan and send a message - let t = c.clone(); - drop(c); + let t = tx.clone(); + drop(tx); t.send(()); // wait for the child task to exit before we exit - p1.recv(); + rx2.recv(); }) test!(fn sends_off_the_runtime() { use rt::thread::Thread; - let (p, c) = Chan::new(); + let (tx, rx) = channel(); let t = Thread::start(proc() { for _ in range(0, 1000) { - c.send(()); + tx.send(()); } }); for _ in range(0, 1000) { - p.recv(); + rx.recv(); } t.join(); }) @@ -1223,12 +1223,12 @@ mod test { test!(fn try_recvs_off_the_runtime() { use rt::thread::Thread; - let (p, c) = Chan::new(); - let (pdone, cdone) = Chan::new(); + let (tx, rx) = channel(); + let (cdone, pdone) = channel(); let t = Thread::start(proc() { let mut hits = 0; while hits < 10 { - match p.try_recv() { + match rx.try_recv() { Data(()) => { hits += 1; } Empty => { Thread::yield_now(); } Disconnected => return, @@ -1237,7 +1237,7 @@ mod test { cdone.send(()); }); for _ in range(0, 10) { - c.send(()); + tx.send(()); } t.join(); pdone.recv(); diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs index 0f78c1971bc..1bc7349a70d 100644 --- a/src/libstd/comm/oneshot.rs +++ b/src/libstd/comm/oneshot.rs @@ -32,7 +32,7 @@ /// The one caveat to consider is that when a port sees a disconnected channel /// it must check for data because there is no "data plus upgrade" state. -use comm::Port; +use comm::Receiver; use kinds::Send; use mem; use ops::Drop; @@ -60,7 +60,7 @@ pub struct Packet<T> { pub enum Failure<T> { Empty, Disconnected, - Upgraded(Port<T>), + Upgraded(Receiver<T>), } pub enum UpgradeResult { @@ -71,14 +71,14 @@ pub enum UpgradeResult { pub enum SelectionResult<T> { SelCanceled(BlockedTask), - SelUpgraded(BlockedTask, Port<T>), + SelUpgraded(BlockedTask, Receiver<T>), SelSuccess, } enum MyUpgrade<T> { NothingSent, SendUsed, - GoUp(Port<T>), + GoUp(Receiver<T>), } impl<T: Send> Packet<T> { @@ -201,7 +201,7 @@ impl<T: Send> Packet<T> { // Returns whether the upgrade was completed. If the upgrade wasn't // completed, then the port couldn't get sent to the other half (it will // never receive it). - pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult { + pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { let prev = match self.upgrade { NothingSent => NothingSent, SendUsed => SendUsed, @@ -259,7 +259,7 @@ impl<T: Send> Packet<T> { // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. - pub fn can_recv(&mut self) -> Result<bool, Port<T>> { + pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { match self.state.load(atomics::SeqCst) { EMPTY => Ok(false), // Welp, we tried DATA => Ok(true), // we have some un-acquired data @@ -318,7 +318,7 @@ impl<T: Send> Packet<T> { // blocked task will no longer be visible to any other threads. // // The return value indicates whether there's data on this port. - pub fn abort_selection(&mut self) -> Result<bool, Port<T>> { + pub fn abort_selection(&mut self) -> Result<bool, Receiver<T>> { let state = match self.state.load(atomics::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 3c6828fc14f..3e134b92493 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -8,38 +8,39 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! Selection over an array of ports +//! Selection over an array of receivers //! //! This module contains the implementation machinery necessary for selecting -//! over a number of ports. One large goal of this module is to provide an -//! efficient interface to selecting over any port of any type. +//! over a number of receivers. One large goal of this module is to provide an +//! efficient interface to selecting over any receiver of any type. //! -//! This is achieved through an architecture of a "port set" in which ports are -//! added to a set and then the entire set is waited on at once. The set can be -//! waited on multiple times to prevent re-adding each port to the set. +//! This is achieved through an architecture of a "receiver set" in which +//! receivers are added to a set and then the entire set is waited on at once. +//! The set can be waited on multiple times to prevent re-adding each receiver +//! to the set. //! //! Usage of this module is currently encouraged to go through the use of the //! `select!` macro. This macro allows naturally binding of variables to the -//! received values of ports in a much more natural syntax then usage of the +//! received values of receivers in a much more natural syntax then usage of the //! `Select` structure directly. //! //! # Example //! -//! ```rust,ignore -//! let (mut p1, c1) = Chan::new(); -//! let (mut p2, c2) = Chan::new(); +//! ```rust +//! let (tx1, rx1) = channel(); +//! let (tx2, rx2) = channel(); //! -//! c1.send(1); -//! c2.send(2); +//! tx1.send(1); +//! tx2.send(2); //! -//! select! ( -//! val = p1.recv() => { +//! select! { +//! val = rx1.recv() => { //! assert_eq!(val, 1); -//! } -//! val = p2.recv() => { +//! }, +//! val = rx2.recv() => { //! assert_eq!(val, 2); //! } -//! ) +//! } //! ``` #[allow(dead_code)]; @@ -55,11 +56,11 @@ use ptr::RawPtr; use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use super::Port; +use super::Receiver; use uint; -/// The "port set" of the select interface. This structure is used to manage a -/// set of ports which are being selected over. +/// The "receiver set" of the select interface. This structure is used to manage +/// a set of receivers which are being selected over. pub struct Select { priv head: *mut Handle<'static, ()>, priv tail: *mut Handle<'static, ()>, @@ -68,22 +69,22 @@ pub struct Select { priv marker2: marker::NoFreeze, } -/// A handle to a port which is currently a member of a `Select` set of ports. -/// This handle is used to keep the port in the set as well as interact with the -/// underlying port. -pub struct Handle<'port, T> { +/// A handle to a receiver which is currently a member of a `Select` set of +/// receivers. This handle is used to keep the receiver in the set as well as +/// interact with the underlying receiver. +pub struct Handle<'rx, T> { /// The ID of this handle, used to compare against the return value of /// `Select::wait()` priv id: uint, - priv selector: &'port Select, + priv selector: &'rx Select, priv next: *mut Handle<'static, ()>, priv prev: *mut Handle<'static, ()>, priv added: bool, - priv packet: &'port Packet, + priv packet: &'rx Packet, // due to our fun transmutes, we be sure to place this at the end. (nothing // previous relies on T) - priv port: &'port Port<T>, + priv rx: &'rx Receiver<T>, } struct Packets { cur: *mut Handle<'static, ()> } @@ -111,10 +112,10 @@ impl Select { } } - /// Creates a new handle into this port set for a new port. Note that this - /// does *not* add the port to the port set, for that you must call the - /// `add` method on the handle itself. - pub fn handle<'a, T: Send>(&'a self, port: &'a Port<T>) -> Handle<'a, T> { + /// Creates a new handle into this receiver set for a new receiver. Note + /// that this does *not* add the receiver to the receiver set, for that you + /// must call the `add` method on the handle itself. + pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> { let id = self.next_id.get(); self.next_id.set(id + 1); Handle { @@ -123,12 +124,12 @@ impl Select { next: 0 as *mut Handle<'static, ()>, prev: 0 as *mut Handle<'static, ()>, added: false, - port: port, - packet: port, + rx: rx, + packet: rx, } } - /// Waits for an event on this port set. The returned value is *not* an + /// Waits for an event on this receiver set. The returned value is *not* an /// index, but rather an id. This id can be queried against any active /// `Handle` structures (each one has an `id` method). The handle with /// the matching `id` will have some sort of event available on it. The @@ -141,24 +142,24 @@ impl Select { /// Helper method for skipping the preflight checks during testing fn wait2(&self, do_preflight_checks: bool) -> uint { // Note that this is currently an inefficient implementation. We in - // theory have knowledge about all ports in the set ahead of time, so - // this method shouldn't really have to iterate over all of them yet - // again. The idea with this "port set" interface is to get the + // theory have knowledge about all receivers in the set ahead of time, + // so this method shouldn't really have to iterate over all of them yet + // again. The idea with this "receiver set" interface is to get the // interface right this time around, and later this implementation can // be optimized. // // This implementation can be summarized by: // - // fn select(ports) { - // if any port ready { return ready index } + // fn select(receivers) { + // if any receiver ready { return ready index } // deschedule { - // block on all ports + // block on all receivers // } - // unblock on all ports + // unblock on all receivers // return ready index // } // - // Most notably, the iterations over all of the ports shouldn't be + // Most notably, the iterations over all of the receivers shouldn't be // necessary. unsafe { let mut amt = 0; @@ -176,7 +177,7 @@ impl Select { // Acquire a number of blocking contexts, and block on each one // sequentially until one fails. If one fails, then abort - // immediately so we can go unblock on all the other ports. + // immediately so we can go unblock on all the other receivers. let task: ~Task = Local::take(); task.deschedule(amt, |task| { // Prepare for the block @@ -191,18 +192,18 @@ impl Select { } }); - // Abort the selection process on each port. If the abort process - // returns `true`, then that means that the port is ready to receive - // some data. Note that this also means that the port may have yet - // to have fully read the `to_wake` field and woken us up (although - // the wakeup is guaranteed to fail). + // Abort the selection process on each receiver. If the abort + // process returns `true`, then that means that the receiver is + // ready to receive some data. Note that this also means that the + // receiver may have yet to have fully read the `to_wake` field and + // woken us up (although the wakeup is guaranteed to fail). // // This situation happens in the window of where a sender invokes // increment(), sees -1, and then decides to wake up the task. After // all this is done, the sending thread will set `selecting` to // `false`. Until this is done, we cannot return. If we were to - // return, then a sender could wake up a port which has gone back to - // sleep after this call to `select`. + // return, then a sender could wake up a receiver which has gone + // back to sleep after this call to `select`. // // Note that it is a "fairly small window" in which an increment() // views that it should wake a thread up until the `selecting` bit @@ -226,20 +227,20 @@ impl Select { fn iter(&self) -> Packets { Packets { cur: self.head } } } -impl<'port, T: Send> Handle<'port, T> { +impl<'rx, T: Send> Handle<'rx, T> { /// Retrieve the id of this handle. #[inline] pub fn id(&self) -> uint { self.id } - /// Receive a value on the underlying port. Has the same semantics as - /// `Port.recv` - pub fn recv(&mut self) -> T { self.port.recv() } - /// Block to receive a value on the underlying port, returning `Some` on + /// Receive a value on the underlying receiver. Has the same semantics as + /// `Receiver.recv` + pub fn recv(&mut self) -> T { self.rx.recv() } + /// Block to receive a value on the underlying receiver, returning `Some` on /// success or `None` if the channel disconnects. This function has the same - /// semantics as `Port.recv_opt` - pub fn recv_opt(&mut self) -> Option<T> { self.port.recv_opt() } + /// semantics as `Receiver.recv_opt` + pub fn recv_opt(&mut self) -> Option<T> { self.rx.recv_opt() } - /// Adds this handle to the port set that the handle was created from. This + /// Adds this handle to the receiver set that the handle was created from. This /// method can be called multiple times, but it has no effect if `add` was /// called previously. /// @@ -300,7 +301,7 @@ impl Drop for Select { } #[unsafe_destructor] -impl<'port, T: Send> Drop for Handle<'port, T> { +impl<'rx, T: Send> Drop for Handle<'rx, T> { fn drop(&mut self) { unsafe { self.remove() } } @@ -325,328 +326,328 @@ mod test { use prelude::*; test!(fn smoke() { - let (p1, c1) = Chan::<int>::new(); - let (p2, c2) = Chan::<int>::new(); - c1.send(1); + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + tx1.send(1); select! ( - foo = p1.recv() => { assert_eq!(foo, 1); }, - _bar = p2.recv() => { fail!() } + foo = rx1.recv() => { assert_eq!(foo, 1); }, + _bar = rx2.recv() => { fail!() } ) - c2.send(2); + tx2.send(2); select! ( - _foo = p1.recv() => { fail!() }, - bar = p2.recv() => { assert_eq!(bar, 2) } + _foo = rx1.recv() => { fail!() }, + bar = rx2.recv() => { assert_eq!(bar, 2) } ) - drop(c1); + drop(tx1); select! ( - foo = p1.recv_opt() => { assert_eq!(foo, None); }, - _bar = p2.recv() => { fail!() } + foo = rx1.recv_opt() => { assert_eq!(foo, None); }, + _bar = rx2.recv() => { fail!() } ) - drop(c2); + drop(tx2); select! ( - bar = p2.recv_opt() => { assert_eq!(bar, None); } + bar = rx2.recv_opt() => { assert_eq!(bar, None); } ) }) test!(fn smoke2() { - let (p1, _c1) = Chan::<int>::new(); - let (p2, _c2) = Chan::<int>::new(); - let (p3, _c3) = Chan::<int>::new(); - let (p4, _c4) = Chan::<int>::new(); - let (p5, c5) = Chan::<int>::new(); - c5.send(4); + let (_tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (_tx3, rx3) = channel::<int>(); + let (_tx4, rx4) = channel::<int>(); + let (tx5, rx5) = channel::<int>(); + tx5.send(4); select! ( - _foo = p1.recv() => { fail!("1") }, - _foo = p2.recv() => { fail!("2") }, - _foo = p3.recv() => { fail!("3") }, - _foo = p4.recv() => { fail!("4") }, - foo = p5.recv() => { assert_eq!(foo, 4); } + _foo = rx1.recv() => { fail!("1") }, + _foo = rx2.recv() => { fail!("2") }, + _foo = rx3.recv() => { fail!("3") }, + _foo = rx4.recv() => { fail!("4") }, + foo = rx5.recv() => { assert_eq!(foo, 4); } ) }) test!(fn closed() { - let (p1, _c1) = Chan::<int>::new(); - let (p2, c2) = Chan::<int>::new(); - drop(c2); + let (_tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + drop(tx2); select! ( - _a1 = p1.recv_opt() => { fail!() }, - a2 = p2.recv_opt() => { assert_eq!(a2, None); } + _a1 = rx1.recv_opt() => { fail!() }, + a2 = rx2.recv_opt() => { assert_eq!(a2, None); } ) }) test!(fn unblocks() { - let (p1, c1) = Chan::<int>::new(); - let (p2, _c2) = Chan::<int>::new(); - let (p3, c3) = Chan::<int>::new(); + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<int>(); spawn(proc() { for _ in range(0, 20) { task::deschedule(); } - c1.send(1); - p3.recv(); + tx1.send(1); + rx3.recv(); for _ in range(0, 20) { task::deschedule(); } }); select! ( - a = p1.recv() => { assert_eq!(a, 1); }, - _b = p2.recv() => { fail!() } + a = rx1.recv() => { assert_eq!(a, 1); }, + _b = rx2.recv() => { fail!() } ) - c3.send(1); + tx3.send(1); select! ( - a = p1.recv_opt() => { assert_eq!(a, None); }, - _b = p2.recv() => { fail!() } + a = rx1.recv_opt() => { assert_eq!(a, None); }, + _b = rx2.recv() => { fail!() } ) }) test!(fn both_ready() { - let (p1, c1) = Chan::<int>::new(); - let (p2, c2) = Chan::<int>::new(); - let (p3, c3) = Chan::<()>::new(); + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { for _ in range(0, 20) { task::deschedule(); } - c1.send(1); - c2.send(2); - p3.recv(); + tx1.send(1); + tx2.send(2); + rx3.recv(); }); select! ( - a = p1.recv() => { assert_eq!(a, 1); }, - a = p2.recv() => { assert_eq!(a, 2); } + a = rx1.recv() => { assert_eq!(a, 1); }, + a = rx2.recv() => { assert_eq!(a, 2); } ) select! ( - a = p1.recv() => { assert_eq!(a, 1); }, - a = p2.recv() => { assert_eq!(a, 2); } + a = rx1.recv() => { assert_eq!(a, 1); }, + a = rx2.recv() => { assert_eq!(a, 2); } ) - assert_eq!(p1.try_recv(), Empty); - assert_eq!(p2.try_recv(), Empty); - c3.send(()); + assert_eq!(rx1.try_recv(), Empty); + assert_eq!(rx2.try_recv(), Empty); + tx3.send(()); }) test!(fn stress() { static AMT: int = 10000; - let (p1, c1) = Chan::<int>::new(); - let (p2, c2) = Chan::<int>::new(); - let (p3, c3) = Chan::<()>::new(); + let (tx1, rx1) = channel::<int>(); + let (tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { for i in range(0, AMT) { if i % 2 == 0 { - c1.send(i); + tx1.send(i); } else { - c2.send(i); + tx2.send(i); } - p3.recv(); + rx3.recv(); } }); for i in range(0, AMT) { select! ( - i1 = p1.recv() => { assert!(i % 2 == 0 && i == i1); }, - i2 = p2.recv() => { assert!(i % 2 == 1 && i == i2); } + i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); }, + i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); } ) - c3.send(()); + tx3.send(()); } }) test!(fn cloning() { - let (p1, c1) = Chan::<int>::new(); - let (p2, _c2) = Chan::<int>::new(); - let (p3, c3) = Chan::<()>::new(); + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { - p3.recv(); - c1.clone(); - assert_eq!(p3.try_recv(), Empty); - c1.send(2); - p3.recv(); + rx3.recv(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Empty); + tx1.send(2); + rx3.recv(); }); - c3.send(()); + tx3.send(()); select!( - _i1 = p1.recv() => {}, - _i2 = p2.recv() => fail!() + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => fail!() ) - c3.send(()); + tx3.send(()); }) test!(fn cloning2() { - let (p1, c1) = Chan::<int>::new(); - let (p2, _c2) = Chan::<int>::new(); - let (p3, c3) = Chan::<()>::new(); + let (tx1, rx1) = channel::<int>(); + let (_tx2, rx2) = channel::<int>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { - p3.recv(); - c1.clone(); - assert_eq!(p3.try_recv(), Empty); - c1.send(2); - p3.recv(); + rx3.recv(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Empty); + tx1.send(2); + rx3.recv(); }); - c3.send(()); + tx3.send(()); select!( - _i1 = p1.recv() => {}, - _i2 = p2.recv() => fail!() + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => fail!() ) - c3.send(()); + tx3.send(()); }) test!(fn cloning3() { - let (p1, c1) = Chan::<()>::new(); - let (p2, c2) = Chan::<()>::new(); - let (p, c) = Chan::new(); + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); spawn(proc() { let s = Select::new(); - let mut h1 = s.handle(&p1); - let mut h2 = s.handle(&p2); + let mut h1 = s.handle(&rx1); + let mut h2 = s.handle(&rx2); unsafe { h2.add(); } unsafe { h1.add(); } assert_eq!(s.wait(), h2.id); - c.send(()); + tx3.send(()); }); for _ in range(0, 1000) { task::deschedule(); } - drop(c1.clone()); - c2.send(()); - p.recv(); + drop(tx1.clone()); + tx2.send(()); + rx3.recv(); }) test!(fn preflight1() { - let (p, c) = Chan::new(); - c.send(()); + let (tx, rx) = channel(); + tx.send(()); select!( - () = p.recv() => {} + () = rx.recv() => {} ) }) test!(fn preflight2() { - let (p, c) = Chan::new(); - c.send(()); - c.send(()); + let (tx, rx) = channel(); + tx.send(()); + tx.send(()); select!( - () = p.recv() => {} + () = rx.recv() => {} ) }) test!(fn preflight3() { - let (p, c) = Chan::new(); - drop(c.clone()); - c.send(()); + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); select!( - () = p.recv() => {} + () = rx.recv() => {} ) }) test!(fn preflight4() { - let (p, c) = Chan::new(); - c.send(()); + let (tx, rx) = channel(); + tx.send(()); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn preflight5() { - let (p, c) = Chan::new(); - c.send(()); - c.send(()); + let (tx, rx) = channel(); + tx.send(()); + tx.send(()); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn preflight6() { - let (p, c) = Chan::new(); - drop(c.clone()); - c.send(()); + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn preflight7() { - let (p, c) = Chan::<()>::new(); - drop(c); + let (tx, rx) = channel::<()>(); + drop(tx); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn preflight8() { - let (p, c) = Chan::new(); - c.send(()); - drop(c); - p.recv(); + let (tx, rx) = channel(); + tx.send(()); + drop(tx); + rx.recv(); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn preflight9() { - let (p, c) = Chan::new(); - drop(c.clone()); - c.send(()); - drop(c); - p.recv(); + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()); + drop(tx); + rx.recv(); let s = Select::new(); - let mut h = s.handle(&p); + let mut h = s.handle(&rx); unsafe { h.add(); } assert_eq!(s.wait2(false), h.id); }) test!(fn oneshot_data_waiting() { - let (p, c) = Chan::new(); - let (p2, c2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { select! { - () = p.recv() => {} + () = rx1.recv() => {} } - c2.send(()); + tx2.send(()); }); for _ in range(0, 100) { task::deschedule() } - c.send(()); - p2.recv(); + tx1.send(()); + rx2.recv(); }) test!(fn stream_data_waiting() { - let (p, c) = Chan::new(); - let (p2, c2) = Chan::new(); - c.send(()); - c.send(()); - p.recv(); - p.recv(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + tx1.send(()); + tx1.send(()); + rx1.recv(); + rx1.recv(); spawn(proc() { select! { - () = p.recv() => {} + () = rx1.recv() => {} } - c2.send(()); + tx2.send(()); }); for _ in range(0, 100) { task::deschedule() } - c.send(()); - p2.recv(); + tx1.send(()); + rx2.recv(); }) test!(fn shared_data_waiting() { - let (p, c) = Chan::new(); - let (p2, c2) = Chan::new(); - drop(c.clone()); - c.send(()); - p.recv(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + drop(tx1.clone()); + tx1.send(()); + rx1.recv(); spawn(proc() { select! { - () = p.recv() => {} + () = rx1.recv() => {} } - c2.send(()); + tx2.send(()); }); for _ in range(0, 100) { task::deschedule() } - c.send(()); - p2.recv(); + tx1.send(()); + rx2.recv(); }) } diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 4eac22b813d..d386e97d5bf 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -18,7 +18,7 @@ /// module. use cmp; -use comm::Port; +use comm::Receiver; use int; use iter::Iterator; use kinds::Send; @@ -51,7 +51,7 @@ pub struct Packet<T> { pub enum Failure<T> { Empty, Disconnected, - Upgraded(Port<T>), + Upgraded(Receiver<T>), } pub enum UpgradeResult { @@ -63,14 +63,14 @@ pub enum UpgradeResult { pub enum SelectionResult<T> { SelSuccess, SelCanceled(BlockedTask), - SelUpgraded(BlockedTask, Port<T>), + SelUpgraded(BlockedTask, Receiver<T>), } // Any message could contain an "upgrade request" to a new shared port, so the // internal queue it's a queue of T, but rather Message<T> enum Message<T> { Data(T), - GoUp(Port<T>), + GoUp(Receiver<T>), } impl<T: Send> Packet<T> { @@ -97,7 +97,7 @@ impl<T: Send> Packet<T> { } } } - pub fn upgrade(&mut self, up: Port<T>) -> UpgradeResult { + pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult { self.do_send(GoUp(up)) } @@ -328,7 +328,7 @@ impl<T: Send> Packet<T> { // Tests to see whether this port can receive without blocking. If Ok is // returned, then that's the answer. If Err is returned, then the returned // port needs to be queried instead (an upgrade happened) - pub fn can_recv(&mut self) -> Result<bool, Port<T>> { + pub fn can_recv(&mut self) -> Result<bool, Receiver<T>> { // We peek at the queue to see if there's anything on it, and we use // this return value to determine if we should pop from the queue and // upgrade this channel immediately. If it looks like we've got an @@ -384,7 +384,7 @@ impl<T: Send> Packet<T> { // Removes a previous task from being blocked in this port pub fn abort_selection(&mut self, - was_upgrade: bool) -> Result<bool, Port<T>> { + was_upgrade: bool) -> Result<bool, Receiver<T>> { // If we're aborting selection after upgrading from a oneshot, then // we're guarantee that no one is waiting. The only way that we could // have seen the upgrade is if data was actually sent on the channel diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index f2d3d410758..f09555e93a0 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -8,25 +8,26 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use prelude::*; - -use comm::{Port, Chan}; +use clone::Clone; use cmp; +use container::Container; +use comm::{Sender, Receiver}; use io; use option::{None, Option, Some}; +use result::{Ok, Err}; use super::{Reader, Writer, IoResult}; use vec::{bytes, CloneableVector, MutableVector, ImmutableVector}; -/// Allows reading from a port. +/// Allows reading from a rx. /// /// # Example /// /// ``` -/// use std::io::PortReader; +/// use std::io::ChanReader; /// -/// let (port, chan) = Chan::new(); -/// # drop(chan); -/// let mut reader = PortReader::new(port); +/// let (tx, rx) = channel(); +/// # drop(tx); +/// let mut reader = ChanReader::new(rx); /// /// let mut buf = ~[0u8, ..100]; /// match reader.read(buf) { @@ -34,26 +35,26 @@ use vec::{bytes, CloneableVector, MutableVector, ImmutableVector}; /// Err(e) => println!("read error: {}", e), /// } /// ``` -pub struct PortReader { +pub struct ChanReader { priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed. priv pos: uint, // How many of the buffered bytes have already be consumed. - priv port: Port<~[u8]>, // The port to pull data from. - priv closed: bool, // Whether the pipe this port connects to has been closed. + priv rx: Receiver<~[u8]>, // The rx to pull data from. + priv closed: bool, // Whether the pipe this rx connects to has been closed. } -impl PortReader { - /// Wraps a `Port` in a `PortReader` structure - pub fn new(port: Port<~[u8]>) -> PortReader { - PortReader { +impl ChanReader { + /// Wraps a `Port` in a `ChanReader` structure + pub fn new(rx: Receiver<~[u8]>) -> ChanReader { + ChanReader { buf: None, pos: 0, - port: port, + rx: rx, closed: false, } } } -impl Reader for PortReader { +impl Reader for ChanReader { fn read(&mut self, buf: &mut [u8]) -> IoResult<uint> { let mut num_read = 0; loop { @@ -72,7 +73,7 @@ impl Reader for PortReader { break; } self.pos = 0; - self.buf = self.port.recv_opt(); + self.buf = self.rx.recv_opt(); self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { @@ -83,7 +84,7 @@ impl Reader for PortReader { } } -/// Allows writing to a chan. +/// Allows writing to a tx. /// /// # Example /// @@ -91,31 +92,31 @@ impl Reader for PortReader { /// # #[allow(unused_must_use)]; /// use std::io::ChanWriter; /// -/// let (port, chan) = Chan::new(); -/// # drop(port); -/// let mut writer = ChanWriter::new(chan); +/// let (tx, rx) = channel(); +/// # drop(rx); +/// let mut writer = ChanWriter::new(tx); /// writer.write("hello, world".as_bytes()); /// ``` pub struct ChanWriter { - priv chan: Chan<~[u8]>, + priv tx: Sender<~[u8]>, } impl ChanWriter { /// Wraps a channel in a `ChanWriter` structure - pub fn new(chan: Chan<~[u8]>) -> ChanWriter { - ChanWriter { chan: chan } + pub fn new(tx: Sender<~[u8]>) -> ChanWriter { + ChanWriter { tx: tx } } } impl Clone for ChanWriter { fn clone(&self) -> ChanWriter { - ChanWriter { chan: self.chan.clone() } + ChanWriter { tx: self.tx.clone() } } } impl Writer for ChanWriter { fn write(&mut self, buf: &[u8]) -> IoResult<()> { - if !self.chan.try_send(buf.to_owned()) { + if !self.tx.try_send(buf.to_owned()) { Err(io::IoError { kind: io::BrokenPipe, desc: "Pipe closed", @@ -136,17 +137,17 @@ mod test { use task; #[test] - fn test_port_reader() { - let (port, chan) = Chan::new(); + fn test_rx_reader() { + let (tx, rx) = channel(); task::spawn(proc() { - chan.send(~[1u8, 2u8]); - chan.send(~[]); - chan.send(~[3u8, 4u8]); - chan.send(~[5u8, 6u8]); - chan.send(~[7u8, 8u8]); + tx.send(~[1u8, 2u8]); + tx.send(~[]); + tx.send(~[3u8, 4u8]); + tx.send(~[5u8, 6u8]); + tx.send(~[7u8, 8u8]); }); - let mut reader = PortReader::new(port); + let mut reader = ChanReader::new(rx); let mut buf = ~[0u8, ..3]; @@ -177,12 +178,12 @@ mod test { #[test] fn test_chan_writer() { - let (port, chan) = Chan::new(); - let mut writer = ChanWriter::new(chan); + let (tx, rx) = channel(); + let mut writer = ChanWriter::new(tx); writer.write_be_u32(42).unwrap(); let wanted = ~[0u8, 0u8, 0u8, 42u8]; - let got = task::try(proc() { port.recv() }).unwrap(); + let got = task::try(proc() { rx.recv() }).unwrap(); assert_eq!(wanted, got); match writer.write_u8(1) { diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index 3aaee59288f..869643336a9 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -245,7 +245,7 @@ pub use self::process::{Process, ProcessConfig}; pub use self::mem::{MemReader, BufReader, MemWriter, BufWriter}; pub use self::buffered::{BufferedReader, BufferedWriter, BufferedStream, LineBufferedWriter}; -pub use self::comm_adapters::{PortReader, ChanWriter}; +pub use self::comm_adapters::{ChanReader, ChanWriter}; pub mod test; diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 53129f3df9b..edadbc7873a 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -195,16 +195,13 @@ mod test { iotest!(fn smoke_test_ip4() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let mut stream = TcpStream::connect(addr); stream.write([99]).unwrap(); }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf).unwrap(); @@ -213,16 +210,13 @@ mod test { iotest!(fn smoke_test_ip6() { let addr = next_test_ip6(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let mut stream = TcpStream::connect(addr); stream.write([99]).unwrap(); }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; stream.read(buf).unwrap(); @@ -231,16 +225,13 @@ mod test { iotest!(fn read_eof_ip4() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); @@ -249,16 +240,13 @@ mod test { iotest!(fn read_eof_ip6() { let addr = next_test_ip6(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); @@ -267,16 +255,13 @@ mod test { iotest!(fn read_eof_twice_ip4() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); @@ -293,16 +278,13 @@ mod test { iotest!(fn read_eof_twice_ip6() { let addr = next_test_ip6(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let mut buf = [0]; let nread = stream.read(buf); @@ -319,16 +301,13 @@ mod test { iotest!(fn write_close_ip4() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let buf = [0]; loop { @@ -347,16 +326,13 @@ mod test { iotest!(fn write_close_ip6() { let addr = next_test_ip6(); - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); let _stream = TcpStream::connect(addr); // Close }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); let mut stream = acceptor.accept(); let buf = [0]; loop { @@ -376,18 +352,15 @@ mod test { iotest!(fn multiple_connect_serial_ip4() { let addr = next_test_ip4(); let max = 10u; - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); for _ in range(0, max) { let mut stream = TcpStream::connect(addr); stream.write([99]).unwrap(); } }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf).unwrap(); @@ -398,18 +371,15 @@ mod test { iotest!(fn multiple_connect_serial_ip6() { let addr = next_test_ip6(); let max = 10u; - let (port, chan) = Chan::new(); + let mut acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - port.recv(); for _ in range(0, max) { let mut stream = TcpStream::connect(addr); stream.write([99]).unwrap(); } }); - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); for ref mut stream in acceptor.incoming().take(max) { let mut buf = [0]; stream.read(buf).unwrap(); @@ -420,11 +390,10 @@ mod test { iotest!(fn multiple_connect_interleaved_greedy_schedule_ip4() { let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = Chan::new(); + let acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + let mut acceptor = acceptor; for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { // Start another task to handle the connection spawn(proc() { @@ -437,7 +406,6 @@ mod test { } }); - port.recv(); connect(0, addr); fn connect(i: int, addr: SocketAddr) { @@ -457,11 +425,10 @@ mod test { iotest!(fn multiple_connect_interleaved_greedy_schedule_ip6() { let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = Chan::<()>::new(); + let acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + let mut acceptor = acceptor; for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) { // Start another task to handle the connection spawn(proc() { @@ -474,7 +441,6 @@ mod test { } }); - port.recv(); connect(0, addr); fn connect(i: int, addr: SocketAddr) { @@ -492,13 +458,12 @@ mod test { }) iotest!(fn multiple_connect_interleaved_lazy_schedule_ip4() { - let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = Chan::new(); + let addr = next_test_ip4(); + let acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + let mut acceptor = acceptor; for stream in acceptor.incoming().take(MAX as uint) { // Start another task to handle the connection spawn(proc() { @@ -511,7 +476,6 @@ mod test { } }); - port.recv(); connect(0, addr); fn connect(i: int, addr: SocketAddr) { @@ -529,13 +493,12 @@ mod test { }) iotest!(fn multiple_connect_interleaved_lazy_schedule_ip6() { - let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = Chan::new(); + let addr = next_test_ip6(); + let acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + let mut acceptor = acceptor; for stream in acceptor.incoming().take(MAX as uint) { // Start another task to handle the connection spawn(proc() { @@ -548,7 +511,6 @@ mod test { } }); - port.recv(); connect(0, addr); fn connect(i: int, addr: SocketAddr) { @@ -576,15 +538,12 @@ mod test { } pub fn peer_name(addr: SocketAddr) { - let (port, chan) = Chan::new(); - + let acceptor = TcpListener::bind(addr).listen(); spawn(proc() { - let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + let mut acceptor = acceptor; acceptor.accept().unwrap(); }); - port.recv(); let stream = TcpStream::connect(addr); assert!(stream.is_ok()); @@ -611,23 +570,23 @@ mod test { iotest!(fn partial_read() { let addr = next_test_ip4(); - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { let mut srv = TcpListener::bind(addr).listen().unwrap(); - c.send(()); + tx.send(()); let mut cl = srv.accept().unwrap(); cl.write([10]).unwrap(); let mut b = [0]; cl.read(b).unwrap(); - c.send(()); + tx.send(()); }); - p.recv(); + rx.recv(); let mut c = TcpStream::connect(addr).unwrap(); let mut b = [0, ..10]; assert_eq!(c.read(b), Ok(1)); c.write([1]).unwrap(); - p.recv(); + rx.recv(); }) iotest!(fn double_bind() { @@ -644,22 +603,22 @@ mod test { iotest!(fn fast_rebind() { let addr = next_test_ip4(); - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - port.recv(); + rx.recv(); let _stream = TcpStream::connect(addr).unwrap(); // Close - port.recv(); + rx.recv(); }); { let mut acceptor = TcpListener::bind(addr).listen(); - chan.send(()); + tx.send(()); { let _stream = acceptor.accept().unwrap(); // Close client - chan.send(()); + tx.send(()); } // Close listener } @@ -681,50 +640,50 @@ mod test { let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p1, c1) = Chan::new(); - let (p2, c2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { let mut s2 = s2; - p1.recv(); + rx1.recv(); s2.write([1]).unwrap(); - c2.send(()); + tx2.send(()); }); - c1.send(()); + tx1.send(()); let mut buf = [0, 0]; assert_eq!(s1.read(buf), Ok(1)); - p2.recv(); + rx2.recv(); }) iotest!(fn tcp_clone_two_read() { let addr = next_test_ip6(); let mut acceptor = TcpListener::bind(addr).listen(); - let (p, c) = Chan::new(); - let c2 = c.clone(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); spawn(proc() { let mut s = TcpStream::connect(addr); s.write([1]).unwrap(); - p.recv(); + rx.recv(); s.write([2]).unwrap(); - p.recv(); + rx.recv(); }); let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p, done) = Chan::new(); + let (done, rx) = channel(); spawn(proc() { let mut s2 = s2; let mut buf = [0, 0]; s2.read(buf).unwrap(); - c2.send(()); + tx2.send(()); done.send(()); }); let mut buf = [0, 0]; s1.read(buf).unwrap(); - c.send(()); + tx1.send(()); - p.recv(); + rx.recv(); }) iotest!(fn tcp_clone_two_write() { @@ -741,7 +700,7 @@ mod test { let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p, done) = Chan::new(); + let (done, rx) = channel(); spawn(proc() { let mut s2 = s2; s2.write([1]).unwrap(); @@ -749,7 +708,7 @@ mod test { }); s1.write([2]).unwrap(); - p.recv(); + rx.recv(); }) } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 8eaa86ea707..e2620556321 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -107,23 +107,23 @@ mod test { iotest!(fn socket_smoke_test_ip4() { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = Chan::new(); - let (port2, chan2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { match UdpSocket::bind(client_ip) { Ok(ref mut client) => { - port.recv(); + rx1.recv(); client.sendto([99], server_ip).unwrap() } Err(..) => fail!() } - chan2.send(()); + tx2.send(()); }); match UdpSocket::bind(server_ip) { Ok(ref mut server) => { - chan.send(()); + tx1.send(()); let mut buf = [0]; match server.recvfrom(buf) { Ok((nread, src)) => { @@ -136,18 +136,18 @@ mod test { } Err(..) => fail!() } - port2.recv(); + rx2.recv(); }) iotest!(fn socket_smoke_test_ip6() { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = Chan::<()>::new(); + let (tx, rx) = channel::<()>(); spawn(proc() { match UdpSocket::bind(client_ip) { Ok(ref mut client) => { - port.recv(); + rx.recv(); client.sendto([99], server_ip).unwrap() } Err(..) => fail!() @@ -156,7 +156,7 @@ mod test { match UdpSocket::bind(server_ip) { Ok(ref mut server) => { - chan.send(()); + tx.send(()); let mut buf = [0]; match server.recvfrom(buf) { Ok((nread, src)) => { @@ -174,27 +174,27 @@ mod test { iotest!(fn stream_smoke_test_ip4() { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = Chan::new(); - let (port2, chan2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { match UdpSocket::bind(client_ip) { Ok(client) => { let client = ~client; let mut stream = client.connect(server_ip); - port.recv(); + rx1.recv(); stream.write([99]).unwrap(); } Err(..) => fail!() } - chan2.send(()); + tx2.send(()); }); match UdpSocket::bind(server_ip) { Ok(server) => { let server = ~server; let mut stream = server.connect(client_ip); - chan.send(()); + tx1.send(()); let mut buf = [0]; match stream.read(buf) { Ok(nread) => { @@ -206,33 +206,33 @@ mod test { } Err(..) => fail!() } - port2.recv(); + rx2.recv(); }) iotest!(fn stream_smoke_test_ip6() { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = Chan::new(); - let (port2, chan2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { match UdpSocket::bind(client_ip) { Ok(client) => { let client = ~client; let mut stream = client.connect(server_ip); - port.recv(); + rx1.recv(); stream.write([99]).unwrap(); } Err(..) => fail!() } - chan2.send(()); + tx2.send(()); }); match UdpSocket::bind(server_ip) { Ok(server) => { let server = ~server; let mut stream = server.connect(client_ip); - chan.send(()); + tx1.send(()); let mut buf = [0]; match stream.read(buf) { Ok(nread) => { @@ -244,7 +244,7 @@ mod test { } Err(..) => fail!() } - port2.recv(); + rx2.recv(); }) pub fn socket_name(addr: SocketAddr) { @@ -284,18 +284,18 @@ mod test { let sock3 = sock1.clone(); - let (p1, c1) = Chan::new(); - let (p2, c2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { let mut sock3 = sock3; - p1.recv(); + rx1.recv(); sock3.sendto([1], addr2).unwrap(); - c2.send(()); + tx2.send(()); }); - c1.send(()); + tx1.send(()); let mut buf = [0, 0]; assert_eq!(sock1.recvfrom(buf), Ok((1, addr2))); - p2.recv(); + rx2.recv(); }) iotest!(fn udp_clone_two_read() { @@ -303,32 +303,32 @@ mod test { let addr2 = next_test_ip4(); let mut sock1 = UdpSocket::bind(addr1).unwrap(); let sock2 = UdpSocket::bind(addr2).unwrap(); - let (p, c) = Chan::new(); - let c2 = c.clone(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); spawn(proc() { let mut sock2 = sock2; sock2.sendto([1], addr1).unwrap(); - p.recv(); + rx.recv(); sock2.sendto([2], addr1).unwrap(); - p.recv(); + rx.recv(); }); let sock3 = sock1.clone(); - let (p, done) = Chan::new(); + let (done, rx) = channel(); spawn(proc() { let mut sock3 = sock3; let mut buf = [0, 0]; sock3.recvfrom(buf).unwrap(); - c2.send(()); + tx2.send(()); done.send(()); }); let mut buf = [0, 0]; sock1.recvfrom(buf).unwrap(); - c.send(()); + tx1.send(()); - p.recv(); + rx.recv(); }) iotest!(fn udp_clone_two_write() { @@ -337,40 +337,40 @@ mod test { let mut sock1 = UdpSocket::bind(addr1).unwrap(); let sock2 = UdpSocket::bind(addr2).unwrap(); - let (p, c) = Chan::new(); - let (serv_port, serv_chan) = Chan::new(); + let (tx, rx) = channel(); + let (serv_tx, serv_rx) = channel(); spawn(proc() { let mut sock2 = sock2; let mut buf = [0, 1]; - p.recv(); + rx.recv(); match sock2.recvfrom(buf) { Ok(..) => {} Err(e) => fail!("failed receive: {}", e), } - serv_chan.send(()); + serv_tx.send(()); }); let sock3 = sock1.clone(); - let (p, done) = Chan::new(); - let c2 = c.clone(); + let (done, rx) = channel(); + let tx2 = tx.clone(); spawn(proc() { let mut sock3 = sock3; match sock3.sendto([1], addr2) { - Ok(..) => { let _ = c2.try_send(()); } + Ok(..) => { let _ = tx2.try_send(()); } Err(..) => {} } done.send(()); }); match sock1.sendto([2], addr2) { - Ok(..) => { let _ = c.try_send(()); } + Ok(..) => { let _ = tx.try_send(()); } Err(..) => {} } - drop(c); + drop(tx); - p.recv(); - serv_port.recv(); + rx.recv(); + serv_rx.recv(); }) } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 64586113a10..bd715858a01 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -224,10 +224,13 @@ mod tests { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); - let (port, chan) = Chan::new(); + + let mut acceptor = match UnixListener::bind(&path1).listen() { + Ok(a) => a, + Err(e) => fail!("failed listen: {}", e), + }; spawn(proc() { - port.recv(); for _ in range(0, times) { let mut stream = UnixStream::connect(&path2); match stream.write([100]) { @@ -237,11 +240,6 @@ mod tests { } }); - let mut acceptor = match UnixListener::bind(&path1).listen() { - Ok(a) => a, - Err(e) => fail!("failed listen: {}", e), - }; - chan.send(()); for _ in range(0, times) { let mut client = acceptor.accept(); let mut buf = [0]; @@ -278,54 +276,54 @@ mod tests { let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p1, c1) = Chan::new(); - let (p2, c2) = Chan::new(); + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); spawn(proc() { let mut s2 = s2; - p1.recv(); + rx1.recv(); debug!("writer writing"); s2.write([1]).unwrap(); debug!("writer done"); - c2.send(()); + tx2.send(()); }); - c1.send(()); + tx1.send(()); let mut buf = [0, 0]; debug!("reader reading"); assert_eq!(s1.read(buf), Ok(1)); debug!("reader done"); - p2.recv(); + rx2.recv(); }) iotest!(fn unix_clone_two_read() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); - let (p, c) = Chan::new(); - let c2 = c.clone(); + let (tx1, rx) = channel(); + let tx2 = tx1.clone(); spawn(proc() { let mut s = UnixStream::connect(&addr); s.write([1]).unwrap(); - p.recv(); + rx.recv(); s.write([2]).unwrap(); - p.recv(); + rx.recv(); }); let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p, done) = Chan::new(); + let (done, rx) = channel(); spawn(proc() { let mut s2 = s2; let mut buf = [0, 0]; s2.read(buf).unwrap(); - c2.send(()); + tx2.send(()); done.send(()); }); let mut buf = [0, 0]; s1.read(buf).unwrap(); - c.send(()); + tx1.send(()); - p.recv(); + rx.recv(); }) iotest!(fn unix_clone_two_write() { @@ -342,14 +340,14 @@ mod tests { let mut s1 = acceptor.accept().unwrap(); let s2 = s1.clone(); - let (p, done) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { let mut s2 = s2; s2.write([1]).unwrap(); - done.send(()); + tx.send(()); }); s1.write([2]).unwrap(); - p.recv(); + rx.recv(); }) } diff --git a/src/libstd/io/pipe.rs b/src/libstd/io/pipe.rs index 763f2d6c6a1..02dfaeb7164 100644 --- a/src/libstd/io/pipe.rs +++ b/src/libstd/io/pipe.rs @@ -77,15 +77,15 @@ mod test { let os::Pipe { input, out } = os::pipe(); let out = PipeStream::open(out); let mut input = PipeStream::open(input); - let (p, c) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { let mut out = out; out.write([10]).unwrap(); - p.recv(); // don't close the pipe until the other read has finished + rx.recv(); // don't close the pipe until the other read has finished }); let mut buf = [0, ..10]; input.read(buf).unwrap(); - c.send(()); + tx.send(()); }) } diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index b782cf1d21a..6afd1bbb27d 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -379,16 +379,16 @@ impl Process { /// The stdin handle to the child is closed before waiting. pub fn wait_with_output(&mut self) -> ProcessOutput { drop(self.stdin.take()); - fn read(stream: Option<io::PipeStream>) -> Port<IoResult<~[u8]>> { - let (p, c) = Chan::new(); + fn read(stream: Option<io::PipeStream>) -> Receiver<IoResult<~[u8]>> { + let (tx, rx) = channel(); match stream { Some(stream) => spawn(proc() { let mut stream = stream; - c.send(stream.read_to_end()) + tx.send(stream.read_to_end()) }), - None => c.send(Ok(~[])) + None => tx.send(Ok(~[])) } - p + rx } let stdout = read(self.stdout.take()); let stderr = read(self.stderr.take()); diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 63df3d2c4f1..c66fcd13917 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -20,7 +20,7 @@ definitions for a number of signals. */ use clone::Clone; -use comm::{Port, Chan}; +use comm::{Sender, Receiver, channel}; use io; use iter::Iterator; use mem::drop; @@ -56,7 +56,7 @@ pub enum Signum { WindowSizeChange = 28i, } -/// Listener provides a port to listen for registered signals. +/// Listener provides a receiver to listen for registered signals. /// /// Listener automatically unregisters its handles once it is out of scope. /// However, clients can still unregister signums manually. @@ -71,7 +71,7 @@ pub enum Signum { /// /// spawn({ /// loop { -/// match listener.port.recv() { +/// match listener.rx.recv() { /// Interrupt => println!("Got Interrupt'ed"), /// _ => (), /// } @@ -82,24 +82,24 @@ pub enum Signum { pub struct Listener { /// A map from signums to handles to keep the handles in memory priv handles: ~[(Signum, ~RtioSignal)], - /// chan is where all the handles send signums, which are received by - /// the clients from port. - priv chan: Chan<Signum>, + /// This is where all the handles send signums, which are received by + /// the clients from the receiver. + priv tx: Sender<Signum>, - /// Clients of Listener can `recv()` from this port. This is exposed to - /// allow selection over this port as well as manipulation of the port + /// Clients of Listener can `recv()` on this receiver. This is exposed to + /// allow selection over it as well as manipulation of the receiver /// directly. - port: Port<Signum>, + rx: Receiver<Signum>, } impl Listener { /// Creates a new listener for signals. Once created, signals are bound via /// the `register` method (otherwise nothing will ever be received) pub fn new() -> Listener { - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); Listener { - chan: chan, - port: port, + tx: tx, + rx: rx, handles: ~[], } } @@ -125,7 +125,7 @@ impl Listener { return Ok(()); // self is already listening to signum, so succeed } match LocalIo::maybe_raise(|io| { - io.signal(signum, self.chan.clone()) + io.signal(signum, self.tx.clone()) }) { Ok(handle) => { self.handles.push((signum, handle)); @@ -166,7 +166,7 @@ mod test_unix { signal.register(Interrupt).unwrap(); sigint(); timer::sleep(10); - match signal.port.recv() { + match signal.rx.recv() { Interrupt => (), s => fail!("Expected Interrupt, got {:?}", s), } @@ -180,11 +180,11 @@ mod test_unix { s2.register(Interrupt).unwrap(); sigint(); timer::sleep(10); - match s1.port.recv() { + match s1.rx.recv() { Interrupt => (), s => fail!("Expected Interrupt, got {:?}", s), } - match s2.port.recv() { + match s2.rx.recv() { Interrupt => (), s => fail!("Expected Interrupt, got {:?}", s), } @@ -199,7 +199,7 @@ mod test_unix { s2.unregister(Interrupt); sigint(); timer::sleep(10); - assert_eq!(s2.port.try_recv(), Empty); + assert_eq!(s2.rx.try_recv(), Empty); } } diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index 241f3d23c6b..7c65e76ab47 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -388,10 +388,10 @@ mod tests { }) iotest!(fn capture_stdout() { - use io::comm_adapters::{PortReader, ChanWriter}; + use io::{ChanReader, ChanWriter}; - let (p, c) = Chan::new(); - let (mut r, w) = (PortReader::new(p), ChanWriter::new(c)); + let (tx, rx) = channel(); + let (mut r, w) = (ChanReader::new(rx), ChanWriter::new(tx)); spawn(proc() { set_stdout(~w as ~Writer); println!("hello!"); @@ -400,10 +400,10 @@ mod tests { }) iotest!(fn capture_stderr() { - use io::comm_adapters::{PortReader, ChanWriter}; + use io::{ChanReader, ChanWriter}; - let (p, c) = Chan::new(); - let (mut r, w) = (PortReader::new(p), ChanWriter::new(c)); + let (tx, rx) = channel(); + let (mut r, w) = (ChanReader::new(rx), ChanWriter::new(tx)); spawn(proc() { set_stderr(~w as ~Writer); fail!("my special message"); diff --git a/src/libstd/io/test.rs b/src/libstd/io/test.rs index 73d52654ebf..a3e5bac89d6 100644 --- a/src/libstd/io/test.rs +++ b/src/libstd/io/test.rs @@ -46,9 +46,9 @@ macro_rules! iotest ( $($a)* #[test] fn green() { f() } $($a)* #[test] fn native() { use native; - let (p, c) = Chan::new(); - native::task::spawn(proc() { c.send(f()) }); - p.recv(); + let (tx, rx) = channel(); + native::task::spawn(proc() { tx.send(f()) }); + rx.recv(); } } ) diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 8a13277aee3..f64c36c611c 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -13,11 +13,11 @@ Synchronous Timers This module exposes the functionality to create timers, block the current task, -and create ports which will receive notifications after a period of time. +and create receivers which will receive notifications after a period of time. */ -use comm::Port; +use comm::Receiver; use rt::rtio::{IoFactory, LocalIo, RtioTimer}; use io::IoResult; @@ -25,7 +25,7 @@ use io::IoResult; /// /// Values of this type can be used to put the current task to sleep for a /// period of time. Handles to this timer can also be created in the form of -/// ports which will receive notifications over time. +/// receivers which will receive notifications over time. /// /// # Example /// @@ -83,33 +83,33 @@ impl Timer { /// Blocks the current task for `msecs` milliseconds. /// - /// Note that this function will cause any other ports for this timer to be - /// invalidated (the other end will be closed). + /// Note that this function will cause any other receivers for this timer to + /// be invalidated (the other end will be closed). pub fn sleep(&mut self, msecs: u64) { self.obj.sleep(msecs); } - /// Creates a oneshot port which will have a notification sent when `msecs` - /// milliseconds has elapsed. This does *not* block the current task, but - /// instead returns immediately. + /// Creates a oneshot receiver which will have a notification sent when + /// `msecs` milliseconds has elapsed. This does *not* block the current + /// task, but instead returns immediately. /// - /// Note that this invalidates any previous port which has been created by - /// this timer, and that the returned port will be invalidated once the - /// timer is destroyed (when it falls out of scope). - pub fn oneshot(&mut self, msecs: u64) -> Port<()> { + /// Note that this invalidates any previous receiver which has been created + /// by this timer, and that the returned receiver will be invalidated once + /// the timer is destroyed (when it falls out of scope). + pub fn oneshot(&mut self, msecs: u64) -> Receiver<()> { self.obj.oneshot(msecs) } - /// Creates a port which will have a continuous stream of notifications + /// Creates a receiver which will have a continuous stream of notifications /// being sent every `msecs` milliseconds. This does *not* block the /// current task, but instead returns immediately. The first notification /// will not be received immediately, but rather after `msec` milliseconds /// have passed. /// - /// Note that this invalidates any previous port which has been created by - /// this timer, and that the returned port will be invalidated once the - /// timer is destroyed (when it falls out of scope). - pub fn periodic(&mut self, msecs: u64) -> Port<()> { + /// Note that this invalidates any previous receiver which has been created + /// by this timer, and that the returned receiver will be invalidated once + /// the timer is destroyed (when it falls out of scope). + pub fn periodic(&mut self, msecs: u64) -> Receiver<()> { self.obj.period(msecs) } } @@ -133,26 +133,26 @@ mod test { iotest!(fn oneshot_twice() { let mut timer = Timer::new().unwrap(); - let port1 = timer.oneshot(10000); - let port = timer.oneshot(1); - port.recv(); - assert_eq!(port1.recv_opt(), None); + let rx1 = timer.oneshot(10000); + let rx = timer.oneshot(1); + rx.recv(); + assert_eq!(rx1.recv_opt(), None); }) iotest!(fn test_io_timer_oneshot_then_sleep() { let mut timer = Timer::new().unwrap(); - let port = timer.oneshot(100000000000); - timer.sleep(1); // this should invalidate the port + let rx = timer.oneshot(100000000000); + timer.sleep(1); // this should inalidate rx - assert_eq!(port.recv_opt(), None); + assert_eq!(rx.recv_opt(), None); }) iotest!(fn test_io_timer_sleep_periodic() { let mut timer = Timer::new().unwrap(); - let port = timer.periodic(1); - port.recv(); - port.recv(); - port.recv(); + let rx = timer.periodic(1); + rx.recv(); + rx.recv(); + rx.recv(); }) iotest!(fn test_io_timer_sleep_periodic_forget() { @@ -167,33 +167,33 @@ mod test { iotest!(fn oneshot() { let mut timer = Timer::new().unwrap(); - let port = timer.oneshot(1); - port.recv(); - assert!(port.recv_opt().is_none()); + let rx = timer.oneshot(1); + rx.recv(); + assert!(rx.recv_opt().is_none()); - let port = timer.oneshot(1); - port.recv(); - assert!(port.recv_opt().is_none()); + let rx = timer.oneshot(1); + rx.recv(); + assert!(rx.recv_opt().is_none()); }) iotest!(fn override() { let mut timer = Timer::new().unwrap(); - let oport = timer.oneshot(100); - let pport = timer.periodic(100); + let orx = timer.oneshot(100); + let prx = timer.periodic(100); timer.sleep(1); - assert_eq!(oport.recv_opt(), None); - assert_eq!(pport.recv_opt(), None); + assert_eq!(orx.recv_opt(), None); + assert_eq!(prx.recv_opt(), None); timer.oneshot(1).recv(); }) iotest!(fn period() { let mut timer = Timer::new().unwrap(); - let port = timer.periodic(1); - port.recv(); - port.recv(); - let port2 = timer.periodic(1); - port2.recv(); - port2.recv(); + let rx = timer.periodic(1); + rx.recv(); + rx.recv(); + let rx2 = timer.periodic(1); + rx2.recv(); + rx2.recv(); }) iotest!(fn sleep() { @@ -204,13 +204,13 @@ mod test { iotest!(fn oneshot_fail() { let mut timer = Timer::new().unwrap(); - let _port = timer.oneshot(1); + let _rx = timer.oneshot(1); fail!(); } #[should_fail]) iotest!(fn period_fail() { let mut timer = Timer::new().unwrap(); - let _port = timer.periodic(1); + let _rx = timer.periodic(1); fail!(); } #[should_fail]) @@ -222,10 +222,10 @@ mod test { iotest!(fn closing_channel_during_drop_doesnt_kill_everything() { // see issue #10375 let mut timer = Timer::new().unwrap(); - let timer_port = timer.periodic(1000); + let timer_rx = timer.periodic(1000); spawn(proc() { - timer_port.recv_opt(); + timer_rx.recv_opt(); }); // when we drop the TimerWatcher we're going to destroy the channel, @@ -235,10 +235,10 @@ mod test { iotest!(fn reset_doesnt_switch_tasks() { // similar test to the one above. let mut timer = Timer::new().unwrap(); - let timer_port = timer.periodic(1000); + let timer_rx = timer.periodic(1000); spawn(proc() { - timer_port.recv_opt(); + timer_rx.recv_opt(); }); timer.oneshot(1); @@ -247,29 +247,29 @@ mod test { iotest!(fn reset_doesnt_switch_tasks2() { // similar test to the one above. let mut timer = Timer::new().unwrap(); - let timer_port = timer.periodic(1000); + let timer_rx = timer.periodic(1000); spawn(proc() { - timer_port.recv_opt(); + timer_rx.recv_opt(); }); timer.sleep(1); }) iotest!(fn sender_goes_away_oneshot() { - let port = { + let rx = { let mut timer = Timer::new().unwrap(); timer.oneshot(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(rx.recv_opt(), None); }) iotest!(fn sender_goes_away_period() { - let port = { + let rx = { let mut timer = Timer::new().unwrap(); timer.periodic(1000) }; - assert_eq!(port.recv_opt(), None); + assert_eq!(rx.recv_opt(), None); }) iotest!(fn receiver_goes_away_oneshot() { diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index f0ea90a4aed..ece9c1bfd20 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -387,17 +387,17 @@ macro_rules! vec( /// # Example /// /// ``` -/// let (p1, c1) = Chan::new(); -/// let (p2, c2) = Chan::new(); +/// let (tx1, rx1) = channel(); +/// let (tx2, rx2) = channel(); /// # fn long_running_task() {} /// # fn calculate_the_answer() -> int { 42 } /// -/// spawn(proc() { long_running_task(); c1.send(()) }); -/// spawn(proc() { c2.send(calculate_the_answer()) }); +/// spawn(proc() { long_running_task(); tx1.send(()) }); +/// spawn(proc() { tx2.send(calculate_the_answer()) }); /// /// select! ( -/// () = p1.recv() => println!("the long running task finished first"), -/// answer = p2.recv() => { +/// () = rx1.recv() => println!("the long running task finished first"), +/// answer = rx2.recv() => { /// println!("the answer was: {}", answer); /// } /// ) @@ -408,16 +408,16 @@ macro_rules! vec( #[experimental] macro_rules! select { ( - $($name:pat = $port:ident.$meth:ident() => $code:expr),+ + $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ ) => ({ use std::comm::Select; let sel = Select::new(); - $( let mut $port = sel.handle(&$port); )+ + $( let mut $rx = sel.handle(&$rx); )+ unsafe { - $( $port.add(); )+ + $( $rx.add(); )+ } let ret = sel.wait(); - $( if ret == $port.id() { let $name = $port.$meth(); $code } else )+ + $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ { unreachable!() } }) } diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index b30c78e7962..a04b59ae601 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -61,7 +61,7 @@ pub use vec::{MutableVector, MutableTotalOrdVector}; pub use vec::{Vector, VectorVector, CloneableVector, ImmutableVector}; // Reexported runtime types -pub use comm::{Port, Chan}; +pub use comm::{channel, Sender, Receiver}; pub use task::spawn; // Reexported statics diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index edb480fe4cb..cd557f01834 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -10,7 +10,7 @@ use c_str::CString; use cast; -use comm::{Chan, Port}; +use comm::{Sender, Receiver}; use libc::c_int; use libc; use ops::Drop; @@ -183,7 +183,7 @@ pub trait IoFactory { fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>; fn tty_open(&mut self, fd: c_int, readable: bool) -> Result<~RtioTTY, IoError>; - fn signal(&mut self, signal: Signum, channel: Chan<Signum>) + fn signal(&mut self, signal: Signum, channel: Sender<Signum>) -> Result<~RtioSignal, IoError>; } @@ -233,8 +233,8 @@ pub trait RtioUdpSocket : RtioSocket { pub trait RtioTimer { fn sleep(&mut self, msecs: u64); - fn oneshot(&mut self, msecs: u64) -> Port<()>; - fn period(&mut self, msecs: u64) -> Port<()>; + fn oneshot(&mut self, msecs: u64) -> Receiver<()>; + fn period(&mut self, msecs: u64) -> Receiver<()>; } pub trait RtioFileStream { diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 72ba98eab4f..86e69560e9d 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -17,7 +17,7 @@ use any::AnyOwnExt; use cast; use cleanup; use clone::Clone; -use comm::Chan; +use comm::Sender; use io::Writer; use iter::{Iterator, Take}; use local_data; @@ -73,7 +73,7 @@ pub enum DeathAction { /// until all its watched children exit before collecting the status. Execute(proc(TaskResult)), /// A channel to send the result of the task on when the task exits - SendMessage(Chan<TaskResult>), + SendMessage(Sender<TaskResult>), } /// Per-task state related to task death, killing, failure, etc. @@ -450,16 +450,16 @@ mod test { #[test] fn comm_stream() { - let (port, chan) = Chan::new(); - chan.send(10); - assert!(port.recv() == 10); + let (tx, rx) = channel(); + tx.send(10); + assert!(rx.recv() == 10); } #[test] fn comm_shared_chan() { - let (port, chan) = Chan::new(); - chan.send(10); - assert!(port.recv() == 10); + let (tx, rx) = channel(); + tx.send(10); + assert!(rx.recv() == 10); } #[test] diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 44825a1ef94..ad0434c634a 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -172,24 +172,24 @@ mod tests { let nmsgs = 1000u; let mut q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); for _ in range(0, nthreads) { let q = q.clone(); - let chan = chan.clone(); + let tx = tx.clone(); native::task::spawn(proc() { let mut q = q; for i in range(0, nmsgs) { assert!(q.push(i)); } - chan.send(()); + tx.send(()); }); } - let mut completion_ports = ~[]; + let mut completion_rxs = ~[]; for _ in range(0, nthreads) { - let (completion_port, completion_chan) = Chan::new(); - completion_ports.push(completion_port); + let (tx, rx) = channel(); + completion_rxs.push(rx); let q = q.clone(); native::task::spawn(proc() { let mut q = q; @@ -203,15 +203,15 @@ mod tests { } } } - completion_chan.send(i); + tx.send(i); }); } - for completion_port in completion_ports.mut_iter() { - assert_eq!(nmsgs, completion_port.recv()); + for rx in completion_rxs.mut_iter() { + assert_eq!(nmsgs, rx.recv()); } for _ in range(0, nthreads) { - port.recv(); + rx.recv(); } } } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 2dc63380cb8..9d69f2b3b08 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -176,17 +176,17 @@ mod tests { Empty => {} Inconsistent | Data(..) => fail!() } - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let chan = chan.clone(); + let tx = tx.clone(); let q = q.clone(); native::task::spawn(proc() { for i in range(0, nmsgs) { unsafe { (*q.get()).push(i); } } - chan.send(()); + tx.send(()); }); } @@ -197,9 +197,9 @@ mod tests { Data(_) => { i += 1 } } } - drop(chan); + drop(tx); for _ in range(0, nthreads) { - port.recv(); + rx.recv(); } } } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index a2c61a2b135..9277587e587 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -273,7 +273,7 @@ mod test { fn stress_bound(bound: uint) { let (a, b) = UnsafeArc::new2(Queue::new(bound)); - let (port, chan) = Chan::new(); + let (tx, rx) = channel(); native::task::spawn(proc() { for _ in range(0, 100000) { loop { @@ -284,12 +284,12 @@ mod test { } } } - chan.send(()); + tx.send(()); }); for _ in range(0, 100000) { unsafe { (*a.get()).push(1); } } - port.recv(); + rx.recv(); } } } diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 16ac46186df..19f41c6fa1c 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -11,26 +11,25 @@ /*! * Utilities for managing and scheduling tasks * - * An executing Rust program consists of a tree of tasks, each with their own - * stack, and sole ownership of their allocated heap data. Tasks communicate - * with each other using ports and channels (see std::comm for more info - * about how communication works). + * An executing Rust program consists of a collection of tasks, each with their + * own stack, and sole ownership of their allocated heap data. Tasks communicate + * with each other using channels (see `std::comm` for more info about how + * communication works). * - * Failure in one task does not propagate to any others (not to parent, not to child). - * Failure propagation is instead handled by using Chan.send() and Port.recv(), which - * will fail if the other end has hung up already. + * Failure in one task does not propagate to any others (not to parent, not to + * child). Failure propagation is instead handled by using the channel send() + * and recv() methods which will fail if the other end has hung up already. * * Task Scheduling: * - * By default, every task is created in the same scheduler as its parent, where it - * is scheduled cooperatively with all other tasks in that scheduler. Some specialized - * applications may want more control over their scheduling, in which case they can be - * spawned into a new scheduler with the specific properties required. See TaskBuilder's - * documentation bellow for more information. + * By default, every task is created with the same "flavor" as the calling task. + * This flavor refers to the scheduling mode, with two possibilities currently + * being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and + * native (1:1) tasks are scheduled by the OS kernel. * * # Example * - * ``` + * ```rust * spawn(proc() { * println!("Hello, World!"); * }) @@ -38,7 +37,7 @@ */ use any::Any; -use comm::{Chan, Port}; +use comm::{Sender, Receiver, channel}; use io::Writer; use kinds::{Send, marker}; use logging::Logger; @@ -62,7 +61,7 @@ pub type TaskResult = Result<(), ~Any>; /// Task configuration options pub struct TaskOpts { /// Enable lifecycle notifications on the given channel - notify_chan: Option<Chan<TaskResult>>, + notify_chan: Option<Sender<TaskResult>>, /// A name for the task-to-be, for identification in failure messages name: Option<SendStr>, /// The size of the stack for the spawned task @@ -116,7 +115,7 @@ impl TaskBuilder { /// /// # Failure /// Fails if a future_result was already set for this task. - pub fn future_result(&mut self) -> Port<TaskResult> { + pub fn future_result(&mut self) -> Receiver<TaskResult> { // FIXME (#3725): Once linked failure and notification are // handled in the library, I can imagine implementing this by just // registering an arbitrary number of task::on_exit handlers and @@ -127,12 +126,12 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let (notify_pipe_po, notify_pipe_ch) = Chan::new(); + let (tx, rx) = channel(); // Reconfigure self to use a notify channel. - self.opts.notify_chan = Some(notify_pipe_ch); + self.opts.notify_chan = Some(tx); - notify_pipe_po + rx } /// Name the task-to-be. Currently the name is used for identification @@ -204,16 +203,16 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ pub fn try<T:Send>(mut self, f: proc() -> T) -> Result<T, ~Any> { - let (po, ch) = Chan::new(); + let (tx, rx) = channel(); let result = self.future_result(); self.spawn(proc() { - ch.send(f()); + tx.send(f()); }); match result.recv() { - Ok(()) => Ok(po.recv()), + Ok(()) => Ok(rx.recv()), Err(cause) => Err(cause) } } @@ -340,25 +339,24 @@ fn test_send_named_task() { #[test] fn test_run_basic() { - let (po, ch) = Chan::new(); + let (tx, rx) = channel(); task().spawn(proc() { - ch.send(()); + tx.send(()); }); - po.recv(); + rx.recv(); } #[test] fn test_with_wrapper() { - let (po, ch) = Chan::new(); + let (tx, rx) = channel(); task().with_wrapper(proc(body) { - let ch = ch; let result: proc() = proc() { body(); - ch.send(()); + tx.send(()); }; result }).spawn(proc() { }); - po.recv(); + rx.recv(); } #[test] @@ -407,50 +405,49 @@ fn test_try_fail() { fn test_spawn_sched() { use clone::Clone; - let (po, ch) = Chan::new(); + let (tx, rx) = channel(); - fn f(i: int, ch: Chan<()>) { - let ch = ch.clone(); + fn f(i: int, tx: Sender<()>) { + let tx = tx.clone(); spawn(proc() { if i == 0 { - ch.send(()); + tx.send(()); } else { - f(i - 1, ch); + f(i - 1, tx); } }); } - f(10, ch); - po.recv(); + f(10, tx); + rx.recv(); } #[test] fn test_spawn_sched_childs_on_default_sched() { - let (po, ch) = Chan::new(); + let (tx, rx) = channel(); spawn(proc() { - let ch = ch; spawn(proc() { - ch.send(()); + tx.send(()); }); }); - po.recv(); + rx.recv(); } #[cfg(test)] fn avoid_copying_the_body(spawnfn: |v: proc()|) { - let (p, ch) = Chan::<uint>::new(); + let (tx, rx) = channel::<uint>(); let x = ~1; let x_in_parent = (&*x) as *int as uint; spawnfn(proc() { let x_in_child = (&*x) as *int as uint; - ch.send(x_in_child); + tx.send(x_in_child); }); - let x_in_child = p.recv(); + let x_in_child = rx.recv(); assert_eq!(x_in_parent, x_in_child); } diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 93322977bc1..de004f0af3e 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -124,14 +124,14 @@ mod tests { for _ in range(0u, num_tasks) { let total = total.clone(); - let (port, chan) = Chan::new(); - futures.push(port); + let (tx, rx) = channel(); + futures.push(rx); task::spawn(proc() { for _ in range(0u, count) { total.with(|count| **count += 1); } - chan.send(()); + tx.send(()); }); }; |
