diff options
| author | Steven Fackler <sfackler@gmail.com> | 2013-10-15 18:54:35 -0700 |
|---|---|---|
| committer | Steven Fackler <sfackler@gmail.com> | 2013-10-15 18:54:35 -0700 |
| commit | 194302493ca269eb33e2718672c2f12380a19497 (patch) | |
| tree | fae5a93061cf4717f19e51ecb35f00cbf2187bfe | |
| parent | 1252ad409258c2d3a2595f8a77dc7b84491cca77 (diff) | |
| download | rust-194302493ca269eb33e2718672c2f12380a19497.tar.gz rust-194302493ca269eb33e2718672c2f12380a19497.zip | |
Remove extra::flatpipes
Closes #9884
| -rw-r--r-- | src/libextra/extra.rs | 1 | ||||
| -rw-r--r-- | src/libextra/flatpipes.rs | 979 |
2 files changed, 0 insertions, 980 deletions
diff --git a/src/libextra/extra.rs b/src/libextra/extra.rs index 1305535dc50..3cea76b8edd 100644 --- a/src/libextra/extra.rs +++ b/src/libextra/extra.rs @@ -54,7 +54,6 @@ pub mod arc; pub mod comm; pub mod future; pub mod task_pool; -pub mod flatpipes; // Collections diff --git a/src/libextra/flatpipes.rs b/src/libextra/flatpipes.rs deleted file mode 100644 index 1fd81626188..00000000000 --- a/src/libextra/flatpipes.rs +++ /dev/null @@ -1,979 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -/*! - -Generic communication channels for things that can be represented as, -or transformed to and from, byte vectors. - -The `FlatPort` and `FlatChan` types implement the generic channel and -port interface for arbitrary types and transport strategies. It can -particularly be used to send and receive serializable types over I/O -streams. - -`FlatPort` and `FlatChan` implement the same comm traits as pipe-based -ports and channels. - -# Example - -This example sends boxed integers across tasks using serialization. - -```rust -let (port, chan) = serial::pipe_stream(); - -do task::spawn || { - for i in range(0, 10) { - chan.send(@i) - } -} - -for i in range(0, 10) { - assert @i == port.recv() -} - ``` - -# Safety Note - -Flat pipes created from `io::Reader`s and `io::Writer`s share the same -blocking properties as the underlying stream. Since some implementations -block the scheduler thread, so will their pipes. - -*/ - -#[allow(missing_doc)]; - - -// The basic send/recv interface FlatChan and PortChan will implement -use std::io; -use std::comm::GenericChan; -use std::comm::GenericPort; -use std::sys::size_of; - -/** -A FlatPort, consisting of a `BytePort` that receives byte vectors, -and an `Unflattener` that converts the bytes to a value. - -Create using the constructors in the `serial` and `pod` modules. -*/ -pub struct FlatPort<T, U, P> { - unflattener: U, - byte_port: P -} - -/** -A FlatChan, consisting of a `Flattener` that converts values to -byte vectors, and a `ByteChan` that transmits the bytes. - -Create using the constructors in the `serial` and `pod` modules. -*/ -pub struct FlatChan<T, F, C> { - flattener: F, - byte_chan: C -} - -/** -Constructors for flat pipes that using serialization-based flattening. -*/ -pub mod serial { - pub use DefaultEncoder = ebml::writer::Encoder; - pub use DefaultDecoder = ebml::reader::Decoder; - - use serialize::{Decodable, Encodable}; - use flatpipes::flatteners::{DeserializingUnflattener, - SerializingFlattener}; - use flatpipes::flatteners::{deserialize_buffer, serialize_value}; - use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan}; - use flatpipes::bytepipes::{PipeBytePort, PipeByteChan}; - use flatpipes::{FlatPort, FlatChan}; - - use std::io::{Reader, Writer}; - use std::comm::{Port, Chan}; - use std::comm; - - pub type ReaderPort<T, R> = FlatPort< - T, DeserializingUnflattener<DefaultDecoder, T>, - ReaderBytePort<R>>; - pub type WriterChan<T, W> = FlatChan< - T, SerializingFlattener<DefaultEncoder, T>, WriterByteChan<W>>; - pub type PipePort<T> = FlatPort< - T, DeserializingUnflattener<DefaultDecoder, T>, PipeBytePort>; - pub type PipeChan<T> = FlatChan< - T, SerializingFlattener<DefaultEncoder, T>, PipeByteChan>; - - /// Create a `FlatPort` from a `Reader` - pub fn reader_port<T: Decodable<DefaultDecoder>, - R: Reader>(reader: R) -> ReaderPort<T, R> { - let unflat: DeserializingUnflattener<DefaultDecoder, T> = - DeserializingUnflattener::new( - deserialize_buffer::<DefaultDecoder, T>); - let byte_port = ReaderBytePort::new(reader); - FlatPort::new(unflat, byte_port) - } - - /// Create a `FlatChan` from a `Writer` - pub fn writer_chan<T: Encodable<DefaultEncoder>, - W: Writer>(writer: W) -> WriterChan<T, W> { - let flat: SerializingFlattener<DefaultEncoder, T> = - SerializingFlattener::new( - serialize_value::<DefaultEncoder, T>); - let byte_chan = WriterByteChan::new(writer); - FlatChan::new(flat, byte_chan) - } - - /// Create a `FlatPort` from a `Port<~[u8]>` - pub fn pipe_port<T:Decodable<DefaultDecoder>>( - port: Port<~[u8]> - ) -> PipePort<T> { - let unflat: DeserializingUnflattener<DefaultDecoder, T> = - DeserializingUnflattener::new( - deserialize_buffer::<DefaultDecoder, T>); - let byte_port = PipeBytePort::new(port); - FlatPort::new(unflat, byte_port) - } - - /// Create a `FlatChan` from a `Chan<~[u8]>` - pub fn pipe_chan<T:Encodable<DefaultEncoder>>( - chan: Chan<~[u8]> - ) -> PipeChan<T> { - let flat: SerializingFlattener<DefaultEncoder, T> = - SerializingFlattener::new( - serialize_value::<DefaultEncoder, T>); - let byte_chan = PipeByteChan::new(chan); - FlatChan::new(flat, byte_chan) - } - - /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes - pub fn pipe_stream<T: Encodable<DefaultEncoder> + - Decodable<DefaultDecoder>>( - ) -> (PipePort<T>, PipeChan<T>) { - let (port, chan) = comm::stream(); - return (pipe_port(port), pipe_chan(chan)); - } -} - -// FIXME #4074 this doesn't correctly enforce POD bounds -/** -Constructors for flat pipes that send POD types using memcpy. - -# Safety Note - -This module is currently unsafe because it uses `Clone + Send` as a type -parameter bounds meaning POD (plain old data), but `Clone + Send` and -POD are not equivalent. - -*/ -pub mod pod { - - use flatpipes::flatteners::{PodUnflattener, PodFlattener}; - use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan}; - use flatpipes::bytepipes::{PipeBytePort, PipeByteChan}; - use flatpipes::{FlatPort, FlatChan}; - - use std::io::{Reader, Writer}; - use std::comm::{Port, Chan}; - use std::comm; - - pub type ReaderPort<T, R> = - FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>; - pub type WriterChan<T, W> = - FlatChan<T, PodFlattener<T>, WriterByteChan<W>>; - pub type PipePort<T> = FlatPort<T, PodUnflattener<T>, PipeBytePort>; - pub type PipeChan<T> = FlatChan<T, PodFlattener<T>, PipeByteChan>; - - /// Create a `FlatPort` from a `Reader` - pub fn reader_port<T:Clone + Send,R:Reader>( - reader: R - ) -> ReaderPort<T, R> { - let unflat: PodUnflattener<T> = PodUnflattener::new(); - let byte_port = ReaderBytePort::new(reader); - FlatPort::new(unflat, byte_port) - } - - /// Create a `FlatChan` from a `Writer` - pub fn writer_chan<T:Clone + Send,W:Writer>( - writer: W - ) -> WriterChan<T, W> { - let flat: PodFlattener<T> = PodFlattener::new(); - let byte_chan = WriterByteChan::new(writer); - FlatChan::new(flat, byte_chan) - } - - /// Create a `FlatPort` from a `Port<~[u8]>` - pub fn pipe_port<T:Clone + Send>(port: Port<~[u8]>) -> PipePort<T> { - let unflat: PodUnflattener<T> = PodUnflattener::new(); - let byte_port = PipeBytePort::new(port); - FlatPort::new(unflat, byte_port) - } - - /// Create a `FlatChan` from a `Chan<~[u8]>` - pub fn pipe_chan<T:Clone + Send>(chan: Chan<~[u8]>) -> PipeChan<T> { - let flat: PodFlattener<T> = PodFlattener::new(); - let byte_chan = PipeByteChan::new(chan); - FlatChan::new(flat, byte_chan) - } - - /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes - pub fn pipe_stream<T:Clone + Send>() -> (PipePort<T>, PipeChan<T>) { - let (port, chan) = comm::stream(); - return (pipe_port(port), pipe_chan(chan)); - } - -} - -/** -Flatteners present a value as a byte vector -*/ -pub trait Flattener<T> { - fn flatten(&self, val: T) -> ~[u8]; -} - -/** -Unflatteners convert a byte vector to a value -*/ -pub trait Unflattener<T> { - fn unflatten(&self, buf: ~[u8]) -> T; -} - -/** -BytePorts are a simple interface for receiving a specified number -*/ -pub trait BytePort { - fn try_recv(&self, count: uint) -> Option<~[u8]>; -} - -/** -ByteChans are a simple interface for sending bytes -*/ -pub trait ByteChan { - fn send(&self, val: ~[u8]); -} - -static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD]; - -impl<T,U:Unflattener<T>,P:BytePort> GenericPort<T> for FlatPort<T, U, P> { - fn recv(&self) -> T { - match self.try_recv() { - Some(val) => val, - None => fail2!("port is closed") - } - } - fn try_recv(&self) -> Option<T> { - let command = match self.byte_port.try_recv(CONTINUE.len()) { - Some(c) => c, - None => { - warn2!("flatpipe: broken pipe"); - return None; - } - }; - - if CONTINUE.as_slice() == command { - let msg_len = match self.byte_port.try_recv(size_of::<u64>()) { - Some(bytes) => { - io::u64_from_be_bytes(bytes, 0, size_of::<u64>()) - }, - None => { - warn2!("flatpipe: broken pipe"); - return None; - } - }; - - let msg_len = msg_len as uint; - - match self.byte_port.try_recv(msg_len) { - Some(bytes) => { - Some(self.unflattener.unflatten(bytes)) - } - None => { - warn2!("flatpipe: broken pipe"); - return None; - } - } - } - else { - fail2!("flatpipe: unrecognized command"); - } - } -} - -impl<T,F:Flattener<T>,C:ByteChan> GenericChan<T> for FlatChan<T, F, C> { - fn send(&self, val: T) { - self.byte_chan.send(CONTINUE.to_owned()); - let bytes = self.flattener.flatten(val); - let len = bytes.len() as u64; - do io::u64_to_be_bytes(len, size_of::<u64>()) |len_bytes| { - self.byte_chan.send(len_bytes.to_owned()); - } - self.byte_chan.send(bytes); - } -} - -impl<T,U:Unflattener<T>,P:BytePort> FlatPort<T, U, P> { - pub fn new(u: U, p: P) -> FlatPort<T, U, P> { - FlatPort { - unflattener: u, - byte_port: p - } - } -} - -impl<T,F:Flattener<T>,C:ByteChan> FlatChan<T, F, C> { - pub fn new(f: F, c: C) -> FlatChan<T, F, C> { - FlatChan { - flattener: f, - byte_chan: c - } - } -} - - -pub mod flatteners { - - use ebml; - use flatpipes::{Flattener, Unflattener}; - use io_util::BufReader; - use json; - use serialize::{Encoder, Decoder, Encodable, Decodable}; - - use std::cast; - use std::io::{Writer, Reader, ReaderUtil}; - use std::io; - use std::ptr; - use std::sys::size_of; - use std::vec; - - // FIXME #4074: Clone + Send != POD - pub struct PodUnflattener<T> { - bogus: () - } - - pub struct PodFlattener<T> { - bogus: () - } - - impl<T:Clone + Send> Unflattener<T> for PodUnflattener<T> { - fn unflatten(&self, buf: ~[u8]) -> T { - assert!(size_of::<T>() != 0); - assert_eq!(size_of::<T>(), buf.len()); - let addr_of_init: &u8 = unsafe { &*vec::raw::to_ptr(buf) }; - let addr_of_value: &T = unsafe { cast::transmute(addr_of_init) }; - (*addr_of_value).clone() - } - } - - impl<T:Clone + Send> Flattener<T> for PodFlattener<T> { - fn flatten(&self, val: T) -> ~[u8] { - assert!(size_of::<T>() != 0); - let val: *T = ptr::to_unsafe_ptr(&val); - let byte_value = val as *u8; - unsafe { vec::from_buf(byte_value, size_of::<T>()) } - } - } - - impl<T:Clone + Send> PodUnflattener<T> { - pub fn new() -> PodUnflattener<T> { - PodUnflattener { - bogus: () - } - } - } - - impl<T:Clone + Send> PodFlattener<T> { - pub fn new() -> PodFlattener<T> { - PodFlattener { - bogus: () - } - } - } - - - pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T; - - pub struct DeserializingUnflattener<D, T> { - deserialize_buffer: DeserializeBuffer<T> - } - - pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8]; - - pub struct SerializingFlattener<S, T> { - serialize_value: SerializeValue<T> - } - - impl<D:Decoder,T:Decodable<D>> Unflattener<T> - for DeserializingUnflattener<D, T> { - fn unflatten(&self, buf: ~[u8]) -> T { - (self.deserialize_buffer)(buf) - } - } - - impl<S:Encoder,T:Encodable<S>> Flattener<T> - for SerializingFlattener<S, T> { - fn flatten(&self, val: T) -> ~[u8] { - (self.serialize_value)(&val) - } - } - - impl<D:Decoder,T:Decodable<D>> DeserializingUnflattener<D, T> { - pub fn new(deserialize_buffer: DeserializeBuffer<T>) - -> DeserializingUnflattener<D, T> { - DeserializingUnflattener { - deserialize_buffer: deserialize_buffer - } - } - } - - impl<S:Encoder,T:Encodable<S>> SerializingFlattener<S, T> { - pub fn new(serialize_value: SerializeValue<T>) - -> SerializingFlattener<S, T> { - SerializingFlattener { - serialize_value: serialize_value - } - } - } - - /* - Implementations of the serialization functions required by - SerializingFlattener - */ - - pub fn deserialize_buffer<D: Decoder + FromReader, - T: Decodable<D>>( - buf: &[u8]) - -> T { - let buf = buf.to_owned(); - let buf_reader = @BufReader::new(buf); - let reader = buf_reader as @Reader; - let mut deser: D = FromReader::from_reader(reader); - Decodable::decode(&mut deser) - } - - pub fn serialize_value<D: Encoder + FromWriter, - T: Encodable<D>>( - val: &T) - -> ~[u8] { - do io::with_bytes_writer |writer| { - let mut ser = FromWriter::from_writer(writer); - val.encode(&mut ser); - } - } - - pub trait FromReader { - fn from_reader(r: @Reader) -> Self; - } - - pub trait FromWriter { - fn from_writer(w: @Writer) -> Self; - } - - impl FromReader for json::Decoder { - fn from_reader(r: @Reader) -> json::Decoder { - match json::from_reader(r) { - Ok(json) => { - json::Decoder(json) - } - Err(e) => fail2!("flatpipe: can't parse json: {:?}", e) - } - } - } - - impl FromWriter for json::Encoder { - fn from_writer(w: @Writer) -> json::Encoder { - json::Encoder(w) - } - } - - impl FromReader for ebml::reader::Decoder { - fn from_reader(r: @Reader) -> ebml::reader::Decoder { - let buf = @r.read_whole_stream(); - let doc = ebml::reader::Doc(buf); - ebml::reader::Decoder(doc) - } - } - - impl FromWriter for ebml::writer::Encoder { - fn from_writer(w: @Writer) -> ebml::writer::Encoder { - ebml::writer::Encoder(w) - } - } - -} - -pub mod bytepipes { - - use flatpipes::{ByteChan, BytePort}; - - use std::comm::{Port, Chan}; - use std::comm; - use std::io::{Writer, Reader, ReaderUtil}; - - pub struct ReaderBytePort<R> { - reader: R - } - - pub struct WriterByteChan<W> { - writer: W - } - - impl<R:Reader> BytePort for ReaderBytePort<R> { - fn try_recv(&self, count: uint) -> Option<~[u8]> { - let mut left = count; - let mut bytes = ~[]; - while !self.reader.eof() && left > 0 { - assert!(left <= count); - assert!(left > 0); - let new_bytes = self.reader.read_bytes(left); - bytes.push_all(new_bytes); - assert!(new_bytes.len() <= left); - left -= new_bytes.len(); - } - - if left == 0 { - return Some(bytes); - } else { - warn2!("flatpipe: dropped {} broken bytes", left); - return None; - } - } - } - - impl<W:Writer> ByteChan for WriterByteChan<W> { - fn send(&self, val: ~[u8]) { - self.writer.write(val); - } - } - - impl<R:Reader> ReaderBytePort<R> { - pub fn new(r: R) -> ReaderBytePort<R> { - ReaderBytePort { - reader: r - } - } - } - - impl<W:Writer> WriterByteChan<W> { - pub fn new(w: W) -> WriterByteChan<W> { - WriterByteChan { - writer: w - } - } - } - - // FIXME #6850: Remove `@mut` when this module is ported to the new I/O traits, - // which use `&mut self` properly. (For example, util::comm::GenericPort's try_recv - // method doesn't use `&mut self`, so the `try_recv` method in the impl of `BytePort` - // for `PipeBytePort` can't have `&mut self` either.) - pub struct PipeBytePort { - port: comm::Port<~[u8]>, - buf: @mut ~[u8] - } - - pub struct PipeByteChan { - chan: comm::Chan<~[u8]> - } - - impl BytePort for PipeBytePort { - fn try_recv(&self, count: uint) -> Option<~[u8]> { - if self.buf.len() >= count { - let mut bytes = ::std::util::replace(&mut *self.buf, ~[]); - *self.buf = bytes.slice(count, bytes.len()).to_owned(); - bytes.truncate(count); - return Some(bytes); - } else if !self.buf.is_empty() { - let mut bytes = ::std::util::replace(&mut *self.buf, ~[]); - assert!(count > bytes.len()); - match self.try_recv(count - bytes.len()) { - Some(rest) => { - bytes.push_all(rest); - return Some(bytes); - } - None => return None - } - } else /* empty */ { - match self.port.try_recv() { - Some(buf) => { - assert!(!buf.is_empty()); - *self.buf = buf; - return self.try_recv(count); - } - None => return None - } - } - } - } - - impl ByteChan for PipeByteChan { - fn send(&self, val: ~[u8]) { - self.chan.send(val) - } - } - - impl PipeBytePort { - pub fn new(p: Port<~[u8]>) -> PipeBytePort { - PipeBytePort { - port: p, - buf: @mut ~[] - } - } - } - - impl PipeByteChan { - pub fn new(c: Chan<~[u8]>) -> PipeByteChan { - PipeByteChan { - chan: c - } - } - } - -} - -#[cfg(test)] -mod test { - - use flatpipes::BytePort; - use flatpipes::pod; - use flatpipes::serial; - use io_util::BufReader; - - use std::io::BytesWriter; - use std::task; - - #[test] - #[ignore(reason = "ebml failure")] - fn test_serializing_memory_stream() { - let writer = BytesWriter::new(); - let chan = serial::writer_chan(writer); - - chan.send(10); - - let bytes = (*chan.byte_chan.writer.bytes).clone(); - - let reader = BufReader::new(bytes); - let port = serial::reader_port(reader); - - let res: int = port.recv(); - assert_eq!(res, 10i); - } - - #[test] - #[ignore(reason = "FIXME #6211 failing on linux snapshot machine")] - fn test_serializing_pipes() { - let (port, chan) = serial::pipe_stream(); - - do task::spawn || { - for i in range(0, 10) { - chan.send(i) - } - } - - for i in range(0, 10) { - assert!(i == port.recv()) - } - } - - #[test] - #[ignore(reason = "ebml failure")] - fn test_serializing_boxes() { - let (port, chan) = serial::pipe_stream(); - - do task::spawn || { - for i in range(0, 10) { - chan.send(@i) - } - } - - for i in range(0, 10) { - assert!(@i == port.recv()) - } - } - - #[test] - fn test_pod_memory_stream() { - let writer = BytesWriter::new(); - let chan = pod::writer_chan(writer); - - chan.send(10); - - let bytes = (*chan.byte_chan.writer.bytes).clone(); - - let reader = BufReader::new(bytes); - let port = pod::reader_port(reader); - - let res: int = port.recv(); - assert_eq!(res, 10); - } - - #[test] - fn test_pod_pipes() { - let (port, chan) = pod::pipe_stream(); - - do task::spawn || { - for i in range(0, 10) { - chan.send(i) - } - } - - for i in range(0, 10) { - assert!(i == port.recv()) - } - } - - // FIXME #2064: Networking doesn't work on x86 - // XXX Broken until networking support is added back - /* - use flatpipes::{Flattener, Unflattener, FlatChan, FlatPort}; - use flatpipes::bytepipes::*; - - #[test] - #[cfg(target_arch = "x86_64")] - fn test_pod_tcp_stream() { - fn reader_port(buf: TcpSocketBuf - ) -> pod::ReaderPort<int, TcpSocketBuf> { - pod::reader_port(buf) - } - fn writer_chan(buf: TcpSocketBuf - ) -> pod::WriterChan<int, TcpSocketBuf> { - pod::writer_chan(buf) - } - test_some_tcp_stream(reader_port, writer_chan, 9666); - } - - #[test] - #[cfg(target_arch = "x86_64")] - fn test_serializing_tcp_stream() { - // XXX Broken until networking support is added back - fn reader_port(buf: TcpSocketBuf - ) -> serial::ReaderPort<int, TcpSocketBuf> { - serial::reader_port(buf) - } - fn writer_chan(buf: TcpSocketBuf - ) -> serial::WriterChan<int, TcpSocketBuf> { - serial::writer_chan(buf) - } - test_some_tcp_stream(reader_port, writer_chan, 9667); - } - - type ReaderPortFactory<U> = - ~fn(TcpSocketBuf) -> FlatPort<int, U, ReaderBytePort<TcpSocketBuf>>; - type WriterChanFactory<F> = - ~fn(TcpSocketBuf) -> FlatChan<int, F, WriterByteChan<TcpSocketBuf>>; - - fn test_some_tcp_stream<U:Unflattener<int>,F:Flattener<int>>( - reader_port: ReaderPortFactory<U>, - writer_chan: WriterChanFactory<F>, - port: uint) { - - use std::cell::Cell; - use std::comm; - use std::result; - use net::ip; - use net::tcp; - use uv; - - // Indicate to the client task that the server is listening - let (begin_connect_port, begin_connect_chan) = comm::stream(); - // The connection is sent from the server task to the receiver task - // to handle the connection - let (accept_port, accept_chan) = comm::stream(); - // The main task will wait until the test is over to proceed - let (finish_port, finish_chan) = comm::stream(); - - let addr0 = ip::v4::parse_addr("127.0.0.1"); - - let begin_connect_chan = Cell::new(begin_connect_chan); - let accept_chan = Cell::new(accept_chan); - - // The server task - let addr = addr0.clone(); - do task::spawn || { - let iotask = &uv::global_loop::get(); - let begin_connect_chan = begin_connect_chan.take(); - let accept_chan = accept_chan.take(); - let listen_res = do tcp::listen( - addr.clone(), port, 128, iotask, |_kill_ch| { - // Tell the sender to initiate the connection - debug2!("listening"); - begin_connect_chan.send(()) - }) |new_conn, kill_ch| { - - // Incoming connection. Send it to the receiver task to accept - let (res_port, res_chan) = comm::stream(); - accept_chan.send((new_conn, res_chan)); - // Wait until the connection is accepted - res_port.recv(); - - // Stop listening - kill_ch.send(None) - }; - - assert!(listen_res.is_ok()); - } - - // Client task - let addr = addr0.clone(); - do task::spawn || { - // Wait for the server to start listening - begin_connect_port.recv(); - - debug2!("connecting"); - let iotask = &uv::global_loop::get(); - let connect_result = tcp::connect(addr.clone(), port, iotask); - assert!(connect_result.is_ok()); - let sock = result::unwrap(connect_result); - let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock); - - // TcpSocketBuf is a Writer! - let chan = writer_chan(socket_buf); - - for i in range(0, 10) { - debug2!("sending {}", i); - chan.send(i) - } - } - - // Receiver task - do task::spawn || { - // Wait for a connection - let (conn, res_chan) = accept_port.recv(); - - debug2!("accepting connection"); - let accept_result = tcp::accept(conn); - debug2!("accepted"); - assert!(accept_result.is_ok()); - let sock = result::unwrap(accept_result); - res_chan.send(()); - - let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(sock); - - // TcpSocketBuf is a Reader! - let port = reader_port(socket_buf); - - for i in range(0, 10) { - let j = port.recv(); - debug2!("received {:?}", j); - assert_eq!(i, j); - } - - // The test is over! - finish_chan.send(()); - } - - finish_port.recv(); - }*/ - - // Tests that the different backends behave the same when the - // binary streaming protocol is broken - mod broken_protocol { - - use flatpipes::{BytePort, FlatPort}; - use flatpipes::flatteners::PodUnflattener; - use flatpipes::pod; - use io_util::BufReader; - - use std::comm; - use std::io; - use std::sys; - use std::task; - - type PortLoader<P> = - ~fn(~[u8]) -> FlatPort<int, PodUnflattener<int>, P>; - - fn reader_port_loader(bytes: ~[u8] - ) -> pod::ReaderPort<int, BufReader> { - let reader = BufReader::new(bytes); - pod::reader_port(reader) - } - - fn pipe_port_loader(bytes: ~[u8] - ) -> pod::PipePort<int> { - let (port, chan) = comm::stream(); - if !bytes.is_empty() { - chan.send(bytes); - } - pod::pipe_port(port) - } - - fn test_try_recv_none1<P:BytePort>(loader: PortLoader<P>) { - let bytes = ~[]; - let port = loader(bytes); - let res: Option<int> = port.try_recv(); - assert!(res.is_none()); - } - - #[test] - fn test_try_recv_none1_reader() { - test_try_recv_none1(reader_port_loader); - } - #[test] - fn test_try_recv_none1_pipe() { - test_try_recv_none1(pipe_port_loader); - } - - fn test_try_recv_none2<P:BytePort>(loader: PortLoader<P>) { - // The control word in the protocol is interrupted - let bytes = ~[0]; - let port = loader(bytes); - let res: Option<int> = port.try_recv(); - assert!(res.is_none()); - } - - #[test] - fn test_try_recv_none2_reader() { - test_try_recv_none2(reader_port_loader); - } - #[test] - fn test_try_recv_none2_pipe() { - test_try_recv_none2(pipe_port_loader); - } - - fn test_try_recv_none3<P:BytePort>(loader: PortLoader<P>) { - static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD]; - // The control word is followed by garbage - let bytes = CONTINUE.to_owned() + &[0u8]; - let port = loader(bytes); - let res: Option<int> = port.try_recv(); - assert!(res.is_none()); - } - - #[test] - fn test_try_recv_none3_reader() { - test_try_recv_none3(reader_port_loader); - } - #[test] - fn test_try_recv_none3_pipe() { - test_try_recv_none3(pipe_port_loader); - } - - fn test_try_recv_none4<P:BytePort>(loader: PortLoader<P>) { - assert!(do task::try || { - static CONTINUE: [u8, ..4] = [0xAA, 0xBB, 0xCC, 0xDD]; - // The control word is followed by a valid length, - // then undeserializable garbage - let len_bytes = do io::u64_to_be_bytes( - 1, sys::size_of::<u64>()) |len_bytes| { - len_bytes.to_owned() - }; - let bytes = CONTINUE.to_owned() + len_bytes + &[0u8, 0, 0, 0]; - - let port = loader(bytes); - - let _res: Option<int> = port.try_recv(); - }.is_err()); - } - - #[test] - fn test_try_recv_none4_reader() { - test_try_recv_none4(reader_port_loader); - } - #[test] - fn test_try_recv_none4_pipe() { - test_try_recv_none4(pipe_port_loader); - } - } - -} |
