about summary refs log tree commit diff
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2013-12-09 13:51:32 -0800
committerbors <bors@rust-lang.org>2013-12-09 13:51:32 -0800
commitf817ed3e6f39e6e0b1a8ec968ea0ad3c1bc36ba5 (patch)
tree9db29911745ef1cd31b7fcf36b6ff042d752c616
parentb485e2b65d075cd128357e81d347ed7f13a361c9 (diff)
parent7168d715a546a4ca67d36a05eabddb00fbe4aa7e (diff)
downloadrust-f817ed3e6f39e6e0b1a8ec968ea0ad3c1bc36ba5.tar.gz
rust-f817ed3e6f39e6e0b1a8ec968ea0ad3c1bc36ba5.zip
auto merge of #10823 : rapha/rust/master, r=alexcrichton
Hi, first pull request here so let me know if I've missed any of the procedure.
-rw-r--r--src/libstd/io/comm_adapters.rs183
1 files changed, 172 insertions, 11 deletions
diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs
index f9cf847621e..a53146f0091 100644
--- a/src/libstd/io/comm_adapters.rs
+++ b/src/libstd/io/comm_adapters.rs
@@ -8,30 +8,107 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use option::Option;
-use comm::{GenericPort, GenericChan};
+use prelude::*;
+
+use comm::{GenericPort, GenericChan, GenericSmartChan};
+use cmp;
+use io;
+use option::{None, Option, Some};
 use super::{Reader, Writer};
+use vec::{bytes, CopyableVector, MutableVector, ImmutableVector};
 
-pub struct PortReader<P>;
+/// Allows reading from a port.
+///
+/// # Example
+///
+/// ```
+/// let reader = PortReader::new(port);
+///
+/// let mut buf = ~[0u8, ..100];
+/// match reader.read(buf) {
+///     Some(nread) => println!("Read {} bytes", nread),
+///     None => println!("At the end of the stream!")
+/// }
+/// ```
+pub struct PortReader<P> {
+    priv buf: Option<~[u8]>,  // A buffer of bytes received but not consumed.
+    priv pos: uint,           // How many of the buffered bytes have already be consumed.
+    priv port: P,             // The port to pull data from.
+    priv closed: bool,        // Whether the pipe this port connects to has been closed.
+}
 
 impl<P: GenericPort<~[u8]>> PortReader<P> {
-    pub fn new(_port: P) -> PortReader<P> { fail!() }
+    pub fn new(port: P) -> PortReader<P> {
+        PortReader {
+            buf: None,
+            pos: 0,
+            port: port,
+            closed: false,
+        }
+    }
 }
 
 impl<P: GenericPort<~[u8]>> Reader for PortReader<P> {
-    fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        let mut num_read = 0;
+        loop {
+            match self.buf {
+                Some(ref prev) => {
+                    let dst = buf.mut_slice_from(num_read);
+                    let src = prev.slice_from(self.pos);
+                    let count = cmp::min(dst.len(), src.len());
+                    bytes::copy_memory(dst, src, count);
+                    num_read += count;
+                    self.pos += count;
+                },
+                None => (),
+            };
+            if num_read == buf.len() || self.closed {
+                break;
+            }
+            self.pos = 0;
+            self.buf = self.port.try_recv();
+            self.closed = self.buf.is_none();
+        }
+        if self.closed && num_read == 0 {
+            io::io_error::cond.raise(io::standard_error(io::EndOfFile));
+            None
+        } else {
+            Some(num_read)
+        }
+    }
 
-    fn eof(&mut self) -> bool { fail!() }
+    fn eof(&mut self) -> bool { self.closed }
 }
 
-pub struct ChanWriter<C>;
+/// Allows writing to a chan.
+///
+/// # Example
+///
+/// ```
+/// let writer = ChanWriter::new(chan);
+/// writer.write("hello, world".as_bytes());
+/// ```
+pub struct ChanWriter<C> {
+    chan: C,
+}
 
-impl<C: GenericChan<~[u8]>> ChanWriter<C> {
-    pub fn new(_chan: C) -> ChanWriter<C> { fail!() }
+impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> {
+    pub fn new(chan: C) -> ChanWriter<C> {
+        ChanWriter { chan: chan }
+    }
 }
 
-impl<C: GenericChan<~[u8]>> Writer for ChanWriter<C> {
-    fn write(&mut self, _buf: &[u8]) { fail!() }
+impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> {
+    fn write(&mut self, buf: &[u8]) {
+        if !self.chan.try_send(buf.to_owned()) {
+            io::io_error::cond.raise(io::IoError {
+                kind: io::BrokenPipe,
+                desc: "Pipe closed",
+                detail: None
+            });
+        }
+    }
 }
 
 pub struct ReaderPort<R>;
@@ -55,3 +132,87 @@ impl<W: Writer> WriterChan<W> {
 impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> {
     fn send(&self, _x: ~[u8]) { fail!() }
 }
+
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+    use super::*;
+    use io;
+    use comm;
+    use task;
+
+    #[test]
+    fn test_port_reader() {
+        let (port, chan) = comm::stream();
+        do task::spawn {
+          chan.send(~[1u8, 2u8]);
+          chan.send(~[]);
+          chan.send(~[3u8, 4u8]);
+          chan.send(~[5u8, 6u8]);
+          chan.send(~[7u8, 8u8]);
+        }
+
+        let mut reader = PortReader::new(port);
+        let mut buf = ~[0u8, ..3];
+
+        assert_eq!(false, reader.eof());
+
+        assert_eq!(Some(0), reader.read(~[]));
+        assert_eq!(false, reader.eof());
+
+        assert_eq!(Some(3), reader.read(buf));
+        assert_eq!(false, reader.eof());
+        assert_eq!(~[1,2,3], buf);
+
+        assert_eq!(Some(3), reader.read(buf));
+        assert_eq!(false, reader.eof());
+        assert_eq!(~[4,5,6], buf);
+
+        assert_eq!(Some(2), reader.read(buf));
+        assert_eq!(~[7,8,6], buf);
+        assert_eq!(true, reader.eof());
+
+        let mut err = None;
+        let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
+            err = Some(k)
+        }).inside(|| {
+            reader.read(buf)
+        });
+        assert_eq!(Some(io::EndOfFile), err);
+        assert_eq!(None, result);
+        assert_eq!(true, reader.eof());
+        assert_eq!(~[7,8,6], buf);
+
+        // Ensure it continues to fail in the same way.
+        err = None;
+        let result = io::io_error::cond.trap(|io::standard_error(k, _, _)| {
+            err = Some(k)
+        }).inside(|| {
+            reader.read(buf)
+        });
+        assert_eq!(Some(io::EndOfFile), err);
+        assert_eq!(None, result);
+        assert_eq!(true, reader.eof());
+        assert_eq!(~[7,8,6], buf);
+    }
+
+    #[test]
+    fn test_chan_writer() {
+        let (port, chan) = comm::stream();
+        let mut writer = ChanWriter::new(chan);
+        writer.write_be_u32(42);
+
+        let wanted = ~[0u8, 0u8, 0u8, 42u8];
+        let got = do task::try { port.recv() }.unwrap();
+        assert_eq!(wanted, got);
+
+        let mut err = None;
+        io::io_error::cond.trap(|io::IoError { kind, .. } | {
+            err = Some(kind)
+        }).inside(|| {
+            writer.write_u8(1)
+        });
+        assert_eq!(Some(io::BrokenPipe), err);
+    }
+}