// Copyright 2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. use clone::Clone; use cmp; use sync::mpsc::{Sender, Receiver}; use old_io; use option::Option::{None, Some}; use result::Result::{Ok, Err}; use slice::bytes; use super::{Buffer, Reader, Writer, IoResult}; use vec::Vec; /// Allows reading from a rx. /// /// # Examples /// /// ``` /// # #![feature(old_io)] /// use std::sync::mpsc::channel; /// use std::old_io::*; /// /// let (tx, rx) = channel(); /// # drop(tx); /// let mut reader = ChanReader::new(rx); /// /// let mut buf = [0; 100]; /// match reader.read(&mut buf) { /// Ok(nread) => println!("Read {} bytes", nread), /// Err(e) => println!("read error: {}", e), /// } /// ``` pub struct ChanReader { buf: Vec, // A buffer of bytes received but not consumed. pos: uint, // How many of the buffered bytes have already be consumed. rx: Receiver>, // The Receiver to pull data from. closed: bool, // Whether the channel this Receiver connects to has been closed. } impl ChanReader { /// Wraps a `Port` in a `ChanReader` structure pub fn new(rx: Receiver>) -> ChanReader { ChanReader { buf: Vec::new(), pos: 0, rx: rx, closed: false, } } } impl Buffer for ChanReader { fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> { if self.pos >= self.buf.len() { self.pos = 0; match self.rx.recv() { Ok(bytes) => { self.buf = bytes; }, Err(..) => { self.closed = true; self.buf = Vec::new(); } } } if self.closed { Err(old_io::standard_error(old_io::EndOfFile)) } else { Ok(&self.buf[self.pos..]) } } fn consume(&mut self, amt: uint) { self.pos += amt; assert!(self.pos <= self.buf.len()); } } impl Reader for ChanReader { fn read(&mut self, buf: &mut [u8]) -> IoResult { let mut num_read = 0; loop { let count = match self.fill_buf().ok() { Some(src) => { let dst = &mut buf[num_read..]; let count = cmp::min(src.len(), dst.len()); bytes::copy_memory(dst, &src[..count]); count }, None => 0, }; self.consume(count); num_read += count; if num_read == buf.len() || self.closed { break; } } if self.closed && num_read == 0 { Err(old_io::standard_error(old_io::EndOfFile)) } else { Ok(num_read) } } } /// Allows writing to a tx. /// /// # Examples /// /// ``` /// # #![feature(old_io, io)] /// # #![allow(unused_must_use)] /// use std::sync::mpsc::channel; /// use std::old_io::*; /// /// let (tx, rx) = channel(); /// # drop(rx); /// let mut writer = ChanWriter::new(tx); /// writer.write("hello, world".as_bytes()); /// ``` pub struct ChanWriter { tx: Sender>, } impl ChanWriter { /// Wraps a channel in a `ChanWriter` structure pub fn new(tx: Sender>) -> ChanWriter { ChanWriter { tx: tx } } } #[stable(feature = "rust1", since = "1.0.0")] impl Clone for ChanWriter { fn clone(&self) -> ChanWriter { ChanWriter { tx: self.tx.clone() } } } impl Writer for ChanWriter { fn write_all(&mut self, buf: &[u8]) -> IoResult<()> { self.tx.send(buf.to_vec()).map_err(|_| { old_io::IoError { kind: old_io::BrokenPipe, desc: "Pipe closed", detail: None } }) } } #[cfg(test)] mod test { use prelude::v1::*; use sync::mpsc::channel; use super::*; use old_io::{self, Reader, Writer, Buffer}; use thread; #[test] fn test_rx_reader() { let (tx, rx) = channel(); thread::spawn(move|| { tx.send(vec![1, 2]).unwrap(); tx.send(vec![]).unwrap(); tx.send(vec![3, 4]).unwrap(); tx.send(vec![5, 6]).unwrap(); tx.send(vec![7, 8]).unwrap(); }); let mut reader = ChanReader::new(rx); let mut buf = [0; 3]; assert_eq!(Ok(0), reader.read(&mut [])); assert_eq!(Ok(3), reader.read(&mut buf)); let a: &[u8] = &[1,2,3]; assert_eq!(a, buf); assert_eq!(Ok(3), reader.read(&mut buf)); let a: &[u8] = &[4,5,6]; assert_eq!(a, buf); assert_eq!(Ok(2), reader.read(&mut buf)); let a: &[u8] = &[7,8,6]; assert_eq!(a, buf); match reader.read(&mut buf) { Ok(..) => panic!(), Err(e) => assert_eq!(e.kind, old_io::EndOfFile), } assert_eq!(a, buf); // Ensure it continues to panic in the same way. match reader.read(&mut buf) { Ok(..) => panic!(), Err(e) => assert_eq!(e.kind, old_io::EndOfFile), } assert_eq!(a, buf); } #[test] fn test_rx_buffer() { let (tx, rx) = channel(); thread::spawn(move|| { tx.send(b"he".to_vec()).unwrap(); tx.send(b"llo wo".to_vec()).unwrap(); tx.send(b"".to_vec()).unwrap(); tx.send(b"rld\nhow ".to_vec()).unwrap(); tx.send(b"are you?".to_vec()).unwrap(); tx.send(b"".to_vec()).unwrap(); }); let mut reader = ChanReader::new(rx); assert_eq!(Ok("hello world\n".to_string()), reader.read_line()); assert_eq!(Ok("how are you?".to_string()), reader.read_line()); match reader.read_line() { Ok(..) => panic!(), Err(e) => assert_eq!(e.kind, old_io::EndOfFile), } } #[test] fn test_chan_writer() { let (tx, rx) = channel(); let mut writer = ChanWriter::new(tx); writer.write_be_u32(42).unwrap(); let wanted = vec![0, 0, 0, 42]; let got = thread::scoped(move|| { rx.recv().unwrap() }).join(); assert_eq!(wanted, got); match writer.write_u8(1) { Ok(..) => panic!(), Err(e) => assert_eq!(e.kind, old_io::BrokenPipe), } } }