diff options
| author | Tim Kuehn <tkuehn@cmu.edu> | 2013-08-31 22:02:22 -0400 |
|---|---|---|
| committer | Tim Kuehn <tkuehn@cmu.edu> | 2013-09-11 22:03:54 -0400 |
| commit | a83599548895c35d3f9b87f7f2e012b09af404c0 (patch) | |
| tree | 05591a08b935b2789bbf220a6b62538d000ae0d2 | |
| parent | 49eb7bd2716f59e0cd47c427a4c60f49e8ce6e50 (diff) | |
| download | rust-a83599548895c35d3f9b87f7f2e012b09af404c0.tar.gz rust-a83599548895c35d3f9b87f7f2e012b09af404c0.zip | |
Rendezvous stream for synchronous channel messaging
| -rw-r--r-- | src/libextra/comm.rs | 104 |
1 files changed, 102 insertions, 2 deletions
diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs index 776e25cac89..dc6f4964b31 100644 --- a/src/libextra/comm.rs +++ b/src/libextra/comm.rs @@ -1,4 +1,4 @@ -// Copyright 2012 The Rust Project Developers. See the COPYRIGHT +// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // @@ -90,9 +90,55 @@ pub fn DuplexStream<T:Send,U:Send>() }) } +/// An extension of `pipes::stream` that provides synchronous message sending. +pub struct SyncChan<T> { priv duplex_stream: DuplexStream<T, ()> } +/// An extension of `pipes::stream` that acknowledges each message received. +pub struct SyncPort<T> { priv duplex_stream: DuplexStream<(), T> } + +impl<T: Send> GenericChan<T> for SyncChan<T> { + fn send(&self, val: T) { + assert!(self.try_send(val), "SyncChan.send: receiving port closed"); + } +} + +impl<T: Send> GenericSmartChan<T> for SyncChan<T> { + /// Sends a message, or report if the receiver has closed the connection before receiving. + fn try_send(&self, val: T) -> bool { + self.duplex_stream.try_send(val) && self.duplex_stream.try_recv().is_some() + } +} + +impl<T: Send> GenericPort<T> for SyncPort<T> { + fn recv(&self) -> T { + self.try_recv().expect("SyncPort.recv: sending channel closed") + } + + fn try_recv(&self) -> Option<T> { + do self.duplex_stream.try_recv().map_move |val| { + self.duplex_stream.try_send(()); + val + } + } +} + +impl<T: Send> Peekable<T> for SyncPort<T> { + fn peek(&self) -> bool { + self.duplex_stream.peek() + } +} + +/// Creates a stream whose channel, upon sending a message, blocks until the message is received. +pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) { + let (chan_stream, port_stream) = DuplexStream(); + (SyncPort { duplex_stream: port_stream }, SyncChan { duplex_stream: chan_stream }) +} + #[cfg(test)] mod test { - use comm::DuplexStream; + use comm::{DuplexStream, rendezvous}; + use std::rt::test::run_in_newsched_task; + use std::task::spawn_unlinked; + #[test] pub fn DuplexStream1() { @@ -104,4 +150,58 @@ mod test { assert!(left.recv() == 123); assert!(right.recv() == ~"abc"); } + + #[test] + pub fn basic_rendezvous_test() { + let (port, chan) = rendezvous(); + + do spawn { + chan.send("abc"); + } + + assert!(port.recv() == "abc"); + } + + #[test] + fn recv_a_lot() { + // Rendezvous streams should be able to handle any number of messages being sent + do run_in_newsched_task { + let (port, chan) = rendezvous(); + do spawn { + do 1000000.times { chan.send(()) } + } + do 1000000.times { port.recv() } + } + } + + #[test] + fn send_and_fail_and_try_recv() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + chan.duplex_stream.send(()); // Can't access this field outside this module + fail!() + } + port.recv() + } + + #[test] + fn try_send_and_recv_then_fail_before_ack() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + port.duplex_stream.recv(); + fail!() + } + chan.try_send(()); + } + + #[test] + #[should_fail] + fn send_and_recv_then_fail_before_ack() { + let (port, chan) = rendezvous(); + do spawn_unlinked { + port.duplex_stream.recv(); + fail!() + } + chan.send(()); + } } |
