about summary refs log tree commit diff
path: root/src/libsync
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-03-17 14:34:25 -0700
committerAlex Crichton <alex@alexcrichton.com>2014-03-24 20:06:37 -0700
commit56cae9b3c0a49ff39f14570301db43008e810695 (patch)
tree3e85ccd4d544a76031847cddb3009bf477d9159a /src/libsync
parent6bf3fca8ff90bbeff8d5c437aa784d0dbf8f9455 (diff)
downloadrust-56cae9b3c0a49ff39f14570301db43008e810695.tar.gz
rust-56cae9b3c0a49ff39f14570301db43008e810695.zip
comm: Implement synchronous channels
This commit contains an implementation of synchronous, bounded channels for
Rust. This is an implementation of the proposal made last January [1]. These
channels are built on mutexes, and currently focus on a working implementation
rather than speed. Receivers for sync channels have select() implemented for
them, but there is currently no implementation of select() for sync senders.

Rust will continue to provide both synchronous and asynchronous channels as part
of the standard distribution, there is no intent to remove asynchronous
channels. This flavor of channels is meant to provide an alternative to
asynchronous channels because like green tasks, asynchronous channels are not
appropriate for all situations.

[1] - https://mail.mozilla.org/pipermail/rust-dev/2014-January/007924.html
Diffstat (limited to 'src/libsync')
-rw-r--r--src/libsync/comm.rs99
-rw-r--r--src/libsync/lib.rs2
2 files changed, 2 insertions, 99 deletions
diff --git a/src/libsync/comm.rs b/src/libsync/comm.rs
index aecea37cce8..628f6459bad 100644
--- a/src/libsync/comm.rs
+++ b/src/libsync/comm.rs
@@ -51,54 +51,9 @@ impl<S:Send,R:Send> DuplexStream<S, R> {
     }
 }
 
-/// An extension of `pipes::stream` that provides synchronous message sending.
-pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> }
-/// An extension of `pipes::stream` that acknowledges each message received.
-pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> }
-
-impl<S: Send> SyncSender<S> {
-    pub fn send(&self, val: S) {
-        assert!(self.try_send(val), "SyncSender.send: receiving port closed");
-    }
-
-    /// Sends a message, or report if the receiver has closed the connection
-    /// before receiving.
-    pub fn try_send(&self, val: S) -> bool {
-        self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
-    }
-}
-
-impl<R: Send> SyncReceiver<R> {
-    pub fn recv(&self) -> R {
-        self.recv_opt().expect("SyncReceiver.recv: sending channel closed")
-    }
-
-    pub fn recv_opt(&self) -> Option<R> {
-        self.duplex_stream.recv_opt().map(|val| {
-            self.duplex_stream.try_send(());
-            val
-        })
-    }
-
-    pub fn try_recv(&self) -> comm::TryRecvResult<R> {
-        match self.duplex_stream.try_recv() {
-            comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
-            state => state,
-        }
-    }
-}
-
-/// Creates a stream whose channel, upon sending a message, blocks until the
-/// message is received.
-pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) {
-    let (chan_stream, port_stream) = duplex();
-    (SyncReceiver { duplex_stream: port_stream },
-     SyncSender { duplex_stream: chan_stream })
-}
-
 #[cfg(test)]
 mod test {
-    use comm::{duplex, rendezvous};
+    use comm::{duplex};
 
 
     #[test]
@@ -111,56 +66,4 @@ mod test {
         assert!(left.recv() == 123);
         assert!(right.recv() == ~"abc");
     }
-
-    #[test]
-    pub fn basic_rendezvous_test() {
-        let (port, chan) = rendezvous();
-
-        spawn(proc() {
-            chan.send("abc");
-        });
-
-        assert!(port.recv() == "abc");
-    }
-
-    #[test]
-    fn recv_a_lot() {
-        // Rendezvous streams should be able to handle any number of messages being sent
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            for _ in range(0, 10000) { chan.send(()); }
-        });
-        for _ in range(0, 10000) { port.recv(); }
-    }
-
-    #[test]
-    fn send_and_fail_and_try_recv() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            chan.duplex_stream.send(()); // Can't access this field outside this module
-            fail!()
-        });
-        port.recv()
-    }
-
-    #[test]
-    fn try_send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            port.duplex_stream.recv();
-            fail!()
-        });
-        chan.try_send(());
-    }
-
-    #[test]
-    #[should_fail]
-    fn send_and_recv_then_fail_before_ack() {
-        let (port, chan) = rendezvous();
-        spawn(proc() {
-            port.duplex_stream.recv();
-            fail!()
-        });
-        chan.send(());
-    }
 }
diff --git a/src/libsync/lib.rs b/src/libsync/lib.rs
index d166076e96e..4df644e3b23 100644
--- a/src/libsync/lib.rs
+++ b/src/libsync/lib.rs
@@ -25,7 +25,7 @@
 #[cfg(test)]
 #[phase(syntax, link)] extern crate log;
 
-pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex};
+pub use comm::{DuplexStream, duplex};
 pub use task_pool::TaskPool;
 pub use future::Future;
 pub use arc::{Arc, Weak};