diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-03-17 14:34:25 -0700 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-03-24 20:06:37 -0700 |
| commit | 56cae9b3c0a49ff39f14570301db43008e810695 (patch) | |
| tree | 3e85ccd4d544a76031847cddb3009bf477d9159a /src/libsync | |
| parent | 6bf3fca8ff90bbeff8d5c437aa784d0dbf8f9455 (diff) | |
| download | rust-56cae9b3c0a49ff39f14570301db43008e810695.tar.gz rust-56cae9b3c0a49ff39f14570301db43008e810695.zip | |
comm: Implement synchronous channels
This commit contains an implementation of synchronous, bounded channels for Rust. This is an implementation of the proposal made last January [1]. These channels are built on mutexes, and currently focus on a working implementation rather than speed. Receivers for sync channels have select() implemented for them, but there is currently no implementation of select() for sync senders. Rust will continue to provide both synchronous and asynchronous channels as part of the standard distribution, there is no intent to remove asynchronous channels. This flavor of channels is meant to provide an alternative to asynchronous channels because like green tasks, asynchronous channels are not appropriate for all situations. [1] - https://mail.mozilla.org/pipermail/rust-dev/2014-January/007924.html
Diffstat (limited to 'src/libsync')
| -rw-r--r-- | src/libsync/comm.rs | 99 | ||||
| -rw-r--r-- | src/libsync/lib.rs | 2 |
2 files changed, 2 insertions, 99 deletions
diff --git a/src/libsync/comm.rs b/src/libsync/comm.rs index aecea37cce8..628f6459bad 100644 --- a/src/libsync/comm.rs +++ b/src/libsync/comm.rs @@ -51,54 +51,9 @@ impl<S:Send,R:Send> DuplexStream<S, R> { } } -/// An extension of `pipes::stream` that provides synchronous message sending. -pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> } -/// An extension of `pipes::stream` that acknowledges each message received. -pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> } - -impl<S: Send> SyncSender<S> { - pub fn send(&self, val: S) { - assert!(self.try_send(val), "SyncSender.send: receiving port closed"); - } - - /// Sends a message, or report if the receiver has closed the connection - /// before receiving. - pub fn try_send(&self, val: S) -> bool { - self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some() - } -} - -impl<R: Send> SyncReceiver<R> { - pub fn recv(&self) -> R { - self.recv_opt().expect("SyncReceiver.recv: sending channel closed") - } - - pub fn recv_opt(&self) -> Option<R> { - self.duplex_stream.recv_opt().map(|val| { - self.duplex_stream.try_send(()); - val - }) - } - - pub fn try_recv(&self) -> comm::TryRecvResult<R> { - match self.duplex_stream.try_recv() { - comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) } - state => state, - } - } -} - -/// Creates a stream whose channel, upon sending a message, blocks until the -/// message is received. -pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) { - let (chan_stream, port_stream) = duplex(); - (SyncReceiver { duplex_stream: port_stream }, - SyncSender { duplex_stream: chan_stream }) -} - #[cfg(test)] mod test { - use comm::{duplex, rendezvous}; + use comm::{duplex}; #[test] @@ -111,56 +66,4 @@ mod test { assert!(left.recv() == 123); assert!(right.recv() == ~"abc"); } - - #[test] - pub fn basic_rendezvous_test() { - let (port, chan) = rendezvous(); - - spawn(proc() { - chan.send("abc"); - }); - - assert!(port.recv() == "abc"); - } - - #[test] - fn recv_a_lot() { - // Rendezvous streams should be able to handle any number of messages being sent - let (port, chan) = rendezvous(); - spawn(proc() { - for _ in range(0, 10000) { chan.send(()); } - }); - for _ in range(0, 10000) { port.recv(); } - } - - #[test] - fn send_and_fail_and_try_recv() { - let (port, chan) = rendezvous(); - spawn(proc() { - chan.duplex_stream.send(()); // Can't access this field outside this module - fail!() - }); - port.recv() - } - - #[test] - fn try_send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); - spawn(proc() { - port.duplex_stream.recv(); - fail!() - }); - chan.try_send(()); - } - - #[test] - #[should_fail] - fn send_and_recv_then_fail_before_ack() { - let (port, chan) = rendezvous(); - spawn(proc() { - port.duplex_stream.recv(); - fail!() - }); - chan.send(()); - } } diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs index d166076e96e..4df644e3b23 100644 --- a/src/libsync/lib.rs +++ b/src/libsync/lib.rs @@ -25,7 +25,7 @@ #[cfg(test)] #[phase(syntax, link)] extern crate log; -pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex}; +pub use comm::{DuplexStream, duplex}; pub use task_pool::TaskPool; pub use future::Future; pub use arc::{Arc, Weak}; |
