about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2012-11-25 14:12:21 -0800
committerBrian Anderson <banderson@mozilla.com>2012-12-17 18:46:42 -0800
commitec9305802ba1bde2d0de34278395379de69f3468 (patch)
treed4869c5bb8eae9754b6429a9ff026afc59eea8dc /src/libstd
parent72f8a03cfbeb1266919d84a12b5bc417ef170e56 (diff)
downloadrust-ec9305802ba1bde2d0de34278395379de69f3468.tar.gz
rust-ec9305802ba1bde2d0de34278395379de69f3468.zip
std: Add flatpipes
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/flatpipes.rs1003
-rw-r--r--src/libstd/std.rc1
2 files changed, 1004 insertions, 0 deletions
diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs
new file mode 100644
index 00000000000..5dac5eb9a8b
--- /dev/null
+++ b/src/libstd/flatpipes.rs
@@ -0,0 +1,1003 @@
+/*!
+
+Generic communication channels for things that can be represented as,
+or transformed to and from, byte vectors.
+
+The `FlatPort` and `FlatChan` types implement the generic channel and
+port interface for arbitrary types and transport strategies. It can
+particularly be used to send and recieve serializable types over I/O
+streams.
+
+`FlatPort` and `FlatChan` implement the same comm traits as pipe-based
+ports and channels.
+
+# Example
+
+This example sends boxed integers across tasks using serialization.
+
+~~~
+let (port, chan) = serial::pipe_stream();
+
+do task::spawn |move chan| {
+    for int::range(0, 10) |i| {
+        chan.send(@i)
+    }
+}
+
+for int::range(0, 10) |i| {
+    assert @i == port.recv()
+}
+~~~
+
+# Safety Note
+
+Flat pipes created from `io::Reader`s and `io::Writer`s share the same
+blocking properties as the underlying stream. Since some implementations
+block the scheduler thread, so will their pipes.
+
+*/
+
+// The basic send/recv interface FlatChan and PortChan will implement
+use core::pipes::GenericChan;
+use core::pipes::GenericPort;
+
+use core::sys::size_of;
+
+/**
+A FlatPort, consisting of a `BytePort` that recieves byte vectors,
+and an `Unflattener` that converts the bytes to a value.
+
+Create using the constructors in the `serial` and `pod` modules.
+*/
+pub struct FlatPort<T, U: Unflattener<T>, P: BytePort> {
+    unflattener: U,
+    byte_port: P
+}
+
+/**
+A FlatChan, consisting of a `Flattener` that converts values to
+byte vectors, and a `ByteChan` that transmits the bytes.
+
+Create using the constructors in the `serial` and `pod` modules.
+*/
+pub struct FlatChan<T, F: Flattener<T>, C: ByteChan> {
+    flattener: F,
+    byte_chan: C
+}
+
+/**
+Constructors for flat pipes that using serialization-based flattening.
+*/
+pub mod serial {
+
+    pub use DefaultSerializer = ebml::writer::Serializer;
+    pub use DefaultDeserializer = ebml::reader::Deserializer;
+
+    use core::io::{Reader, Writer};
+    use core::pipes::{Port, Chan};
+    use serialization::{Deserializable, Serializable};
+    use flatpipes::flatteners::{DeserializingUnflattener,
+                                SerializingFlattener};
+    use flatpipes::flatteners::{deserialize_buffer, serialize_value};
+    use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
+    use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
+
+    pub type ReaderPort<T, R> = FlatPort<
+        T, DeserializingUnflattener<DefaultDeserializer, T>,
+        ReaderBytePort<R>>;
+    pub type WriterChan<T, W> = FlatChan<
+        T, SerializingFlattener<DefaultSerializer, T>, WriterByteChan<W>>;
+    pub type PipePort<T> = FlatPort<
+        T, DeserializingUnflattener<DefaultDeserializer, T>, PipeBytePort>;
+    pub type PipeChan<T> = FlatChan<
+        T, SerializingFlattener<DefaultSerializer, T>, PipeByteChan>;
+
+    /// Create a `FlatPort` from a `Reader`
+    pub fn reader_port<T: Deserializable<DefaultDeserializer>,
+                       R: Reader>(reader: R) -> ReaderPort<T, R> {
+        let unflat: DeserializingUnflattener<DefaultDeserializer, T> =
+            DeserializingUnflattener::new(
+                deserialize_buffer::<DefaultDeserializer, T>);
+        let byte_port = ReaderBytePort::new(move reader);
+        FlatPort::new(move unflat, move byte_port)
+    }
+
+    /// Create a `FlatChan` from a `Writer`
+    pub fn writer_chan<T: Serializable<DefaultSerializer>,
+                       W: Writer>(writer: W) -> WriterChan<T, W> {
+        let flat: SerializingFlattener<DefaultSerializer, T> =
+            SerializingFlattener::new(
+                serialize_value::<DefaultSerializer, T>);
+        let byte_chan = WriterByteChan::new(move writer);
+        FlatChan::new(move flat, move byte_chan)
+    }
+
+    /// Create a `FlatPort` from a `Port<~[u8]>`
+    pub fn pipe_port<T: Deserializable<DefaultDeserializer>>(
+        port: Port<~[u8]>
+    ) -> PipePort<T> {
+        let unflat: DeserializingUnflattener<DefaultDeserializer, T> =
+            DeserializingUnflattener::new(
+                deserialize_buffer::<DefaultDeserializer, T>);
+        let byte_port = PipeBytePort::new(move port);
+        FlatPort::new(move unflat, move byte_port)
+    }
+
+    /// Create a `FlatChan` from a `Chan<~[u8]>`
+    pub fn pipe_chan<T: Serializable<DefaultSerializer>>(
+        chan: Chan<~[u8]>
+    ) -> PipeChan<T> {
+        let flat: SerializingFlattener<DefaultSerializer, T> =
+            SerializingFlattener::new(
+                serialize_value::<DefaultSerializer, T>);
+        let byte_chan = PipeByteChan::new(move chan);
+        FlatChan::new(move flat, move byte_chan)
+    }
+
+    /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
+    pub fn pipe_stream<T: Serializable<DefaultSerializer>
+                          Deserializable<DefaultDeserializer>>(
+                          ) -> (PipePort<T>, PipeChan<T>) {
+        let (port, chan) = pipes::stream();
+        return (pipe_port(move port), pipe_chan(move chan));
+    }
+
+}
+
+// FIXME #4074 this doesn't correctly enforce POD bounds
+/**
+Constructors for flat pipes that send POD types using memcpy.
+
+# Safety Note
+
+This module is currently unsafe because it uses `Copy Owned` as a type
+parameter bounds meaning POD (plain old data), but `Copy Owned` and
+POD are not equivelant.
+
+*/
+pub mod pod {
+
+    use core::io::{Reader, Writer};
+    use core::pipes::{Port, Chan};
+    use flatpipes::flatteners::{PodUnflattener, PodFlattener};
+    use flatpipes::bytepipes::{ReaderBytePort, WriterByteChan};
+    use flatpipes::bytepipes::{PipeBytePort, PipeByteChan};
+
+    pub type ReaderPort<T: Copy Owned, R> =
+        FlatPort<T, PodUnflattener<T>, ReaderBytePort<R>>;
+    pub type WriterChan<T: Copy Owned, W> =
+        FlatChan<T, PodFlattener<T>, WriterByteChan<W>>;
+    pub type PipePort<T: Copy Owned> =
+        FlatPort<T, PodUnflattener<T>, PipeBytePort>;
+    pub type PipeChan<T: Copy Owned> =
+        FlatChan<T, PodFlattener<T>, PipeByteChan>;
+
+    /// Create a `FlatPort` from a `Reader`
+    pub fn reader_port<T: Copy Owned, R: Reader>(
+        reader: R
+    ) -> ReaderPort<T, R> {
+        let unflat: PodUnflattener<T> = PodUnflattener::new();
+        let byte_port = ReaderBytePort::new(move reader);
+        FlatPort::new(move unflat, move byte_port)
+    }
+
+    /// Create a `FlatChan` from a `Writer`
+    pub fn writer_chan<T: Copy Owned, W: Writer>(
+        writer: W
+    ) -> WriterChan<T, W> {
+        let flat: PodFlattener<T> = PodFlattener::new();
+        let byte_chan = WriterByteChan::new(move writer);
+        FlatChan::new(move flat, move byte_chan)
+    }
+
+    /// Create a `FlatPort` from a `Port<~[u8]>`
+    pub fn pipe_port<T: Copy Owned>(port: Port<~[u8]>) -> PipePort<T> {
+        let unflat: PodUnflattener<T> = PodUnflattener::new();
+        let byte_port = PipeBytePort::new(move port);
+        FlatPort::new(move unflat, move byte_port)
+    }
+
+    /// Create a `FlatChan` from a `Chan<~[u8]>`
+    pub fn pipe_chan<T: Copy Owned>(chan: Chan<~[u8]>) -> PipeChan<T> {
+        let flat: PodFlattener<T> = PodFlattener::new();
+        let byte_chan = PipeByteChan::new(move chan);
+        FlatChan::new(move flat, move byte_chan)
+    }
+
+    /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
+    pub fn pipe_stream<T: Copy Owned>() -> (PipePort<T>, PipeChan<T>) {
+        let (port, chan) = pipes::stream();
+        return (pipe_port(move port), pipe_chan(move chan));
+    }
+
+}
+
+/**
+Flatteners present a value as a byte vector
+*/
+pub trait Flattener<T> {
+    fn flatten(&self, val: T) -> ~[u8];
+}
+
+/**
+Unflatteners convert a byte vector to a value
+*/
+pub trait Unflattener<T> {
+    fn unflatten(&self, buf: ~[u8]) -> T;
+}
+
+/**
+BytePorts are a simple interface for receiving a specified number
+*/
+pub trait BytePort {
+    fn try_recv(&self, count: uint) -> Option<~[u8]>;
+}
+
+/**
+ByteChans are a simple interface for sending bytes
+*/
+pub trait ByteChan {
+    fn send(&self, val: ~[u8]);
+}
+
+const CONTINUE: [u8 * 4] = [0xAA, 0xBB, 0xCC, 0xDD];
+
+impl<T, U: Unflattener<T>, P: BytePort> FlatPort<T, U, P>: GenericPort<T> {
+    fn recv() -> T {
+        match self.try_recv() {
+            Some(move val) => move val,
+            None => fail ~"port is closed"
+        }
+    }
+    fn try_recv() -> Option<T> {
+        let command = match self.byte_port.try_recv(CONTINUE.len()) {
+            Some(move c) => move c,
+            None => {
+                warn!("flatpipe: broken pipe");
+                return None;
+            }
+        };
+
+        if vec::eq(command, CONTINUE) {
+            let msg_len = match self.byte_port.try_recv(size_of::<u64>()) {
+                Some(bytes) => {
+                    io::u64_from_be_bytes(bytes, 0, size_of::<u64>())
+                },
+                None => {
+                    warn!("flatpipe: broken pipe");
+                    return None;
+                }
+            };
+
+            let msg_len = msg_len as uint;
+
+            match self.byte_port.try_recv(msg_len) {
+                Some(move bytes) => {
+                    Some(self.unflattener.unflatten(move bytes))
+                }
+                None => {
+                    warn!("flatpipe: broken pipe");
+                    return None;
+                }
+            }
+        }
+        else {
+            fail ~"flatpipe: unrecognized command";
+        }
+    }
+}
+
+impl<T, F: Flattener<T>, C: ByteChan> FlatChan<T, F, C>: GenericChan<T> {
+    fn send(val: T) {
+        self.byte_chan.send(CONTINUE.to_vec());
+        let bytes = self.flattener.flatten(move val);
+        let len = bytes.len() as u64;
+        do io::u64_to_be_bytes(len, size_of::<u64>()) |len_bytes| {
+            self.byte_chan.send(len_bytes.to_vec());
+        }
+        self.byte_chan.send(move bytes);
+    }
+}
+
+impl<T, U: Unflattener<T>, P: BytePort> FlatPort<T, U, P> {
+    static fn new(u: U, p: P) -> FlatPort<T, U, P> {
+        FlatPort {
+            unflattener: move u,
+            byte_port: move p
+        }
+    }
+}
+
+impl<T, F: Flattener<T>, C: ByteChan> FlatChan<T, F, C> {
+    static fn new(f: F, c: C) -> FlatChan<T, F, C> {
+        FlatChan {
+            flattener: move f,
+            byte_chan: move c
+        }
+    }
+}
+
+
+pub mod flatteners {
+
+    use core::sys::size_of;
+
+    use serialization::{Serializer, Deserializer,
+                        Serializable, Deserializable};
+    use serialization::deserialize;
+
+    use core::io::{Writer, Reader, BytesWriter, ReaderUtil};
+    use flatpipes::util::BufReader;
+
+    // XXX: Is copy/send equivalent to pod?
+    pub struct PodUnflattener<T: Copy Owned> {
+        bogus: ()
+    }
+
+    pub struct PodFlattener<T: Copy Owned> {
+        bogus: ()
+    }
+
+    pub impl<T: Copy Owned> PodUnflattener<T>: Unflattener<T> {
+        fn unflatten(&self, buf: ~[u8]) -> T {
+            assert size_of::<T>() != 0;
+            assert size_of::<T>() == buf.len();
+            let addr_of_init: &u8 = unsafe { &*vec::raw::to_ptr(buf) };
+            let addr_of_value: &T = unsafe { cast::transmute(addr_of_init) };
+            copy *addr_of_value
+        }
+    }
+
+    pub impl<T: Copy Owned> PodFlattener<T>: Flattener<T> {
+        fn flatten(&self, val: T) -> ~[u8] {
+            assert size_of::<T>() != 0;
+            let val: *T = ptr::to_unsafe_ptr(&val);
+            let byte_value = val as *u8;
+            unsafe { vec::from_buf(byte_value, size_of::<T>()) }
+        }
+    }
+
+    pub impl<T: Copy Owned> PodUnflattener<T> {
+        static fn new() -> PodUnflattener<T> {
+            PodUnflattener {
+                bogus: ()
+            }
+        }
+    }
+
+    pub impl<T: Copy Owned> PodFlattener<T> {
+        static fn new() -> PodFlattener<T> {
+            PodFlattener {
+                bogus: ()
+            }
+        }
+    }
+
+
+    pub type DeserializeBuffer<T> = ~fn(buf: &[u8]) -> T;
+
+    pub struct DeserializingUnflattener<D: Deserializer,
+                                        T: Deserializable<D>> {
+        deserialize_buffer: DeserializeBuffer<T>
+    }
+
+    pub type SerializeValue<T> = ~fn(val: &T) -> ~[u8];
+
+    pub struct SerializingFlattener<S: Serializer, T: Serializable<S>> {
+        serialize_value: SerializeValue<T>
+    }
+
+    pub impl<D: Deserializer, T: Deserializable<D>>
+        DeserializingUnflattener<D, T>: Unflattener<T> {
+        fn unflatten(&self, buf: ~[u8]) -> T {
+            (self.deserialize_buffer)(buf)
+        }
+    }
+
+    pub impl<S: Serializer, T: Serializable<S>>
+        SerializingFlattener<S, T>: Flattener<T> {
+        fn flatten(&self, val: T) -> ~[u8] {
+            (self.serialize_value)(&val)
+        }
+    }
+
+    pub impl<D: Deserializer, T: Deserializable<D>>
+        DeserializingUnflattener<D, T> {
+
+        static fn new(deserialize_buffer: DeserializeBuffer<T>
+                     ) -> DeserializingUnflattener<D, T> {
+            DeserializingUnflattener {
+                deserialize_buffer: move deserialize_buffer
+            }
+        }
+    }
+
+    pub impl<S: Serializer, T: Serializable<S>>
+        SerializingFlattener<S, T> {
+
+        static fn new(serialize_value: SerializeValue<T>
+                     ) -> SerializingFlattener<S, T> {
+            SerializingFlattener {
+                serialize_value: move serialize_value
+            }
+        }
+    }
+
+    /*
+    Implementations of the serialization functions required by
+    SerializingFlattener
+    */
+
+    pub fn deserialize_buffer<D: Deserializer FromReader,
+                          T: Deserializable<D>>(buf: &[u8]) -> T {
+        let buf = vec::from_slice(buf);
+        let buf_reader = @BufReader::new(move buf);
+        let reader = buf_reader as @Reader;
+        let deser: D = from_reader(reader);
+        deserialize(&deser)
+    }
+
+    pub fn serialize_value<D: Serializer FromWriter,
+                       T: Serializable<D>>(val: &T) -> ~[u8] {
+        let bytes_writer = @BytesWriter();
+        let writer = bytes_writer as @Writer;
+        let ser = from_writer(writer);
+        val.serialize(&ser);
+        let bytes = bytes_writer.bytes.check_out(|bytes| move bytes);
+        return move bytes;
+    }
+
+    pub trait FromReader {
+        static fn from_reader(r: Reader) -> self;
+    }
+
+    pub trait FromWriter {
+        static fn from_writer(w: Writer) -> self;
+    }
+
+    impl json::Deserializer: FromReader {
+        static fn from_reader(r: Reader) -> json::Deserializer {
+            match json::from_reader(r) {
+                Ok(move json) => {
+                    json::Deserializer(move json)
+                }
+                Err(e) => fail fmt!("flatpipe: can't parse json: %?", e)
+            }
+        }
+    }
+
+    impl json::Serializer: FromWriter {
+        static fn from_writer(w: Writer) -> json::Serializer {
+            json::Serializer(move w)
+        }
+    }
+
+    impl ebml::reader::Deserializer: FromReader {
+        static fn from_reader(r: Reader) -> ebml::reader::Deserializer {
+            let buf = @r.read_whole_stream();
+            let doc = ebml::reader::Doc(buf);
+            ebml::reader::Deserializer(move doc)
+        }
+    }
+
+    impl ebml::writer::Serializer: FromWriter {
+        static fn from_writer(w: Writer) -> ebml::writer::Serializer {
+            ebml::writer::Serializer(move w)
+        }
+    }
+
+}
+
+pub mod bytepipes {
+
+    use core::io::{Writer, Reader, ReaderUtil};
+    use core::pipes::{Port, Chan};
+
+    pub struct ReaderBytePort<R: Reader> {
+        reader: R
+    }
+
+    pub struct WriterByteChan<W: Writer> {
+        writer: W
+    }
+
+    pub impl<R: Reader> ReaderBytePort<R>: BytePort {
+        fn try_recv(&self, count: uint) -> Option<~[u8]> {
+            let mut left = count;
+            let mut bytes = ~[];
+            while !self.reader.eof() && left > 0 {
+                assert left <= count;
+                assert left > 0;
+                let new_bytes = self.reader.read_bytes(left);
+                bytes.push_all(new_bytes);
+                assert new_bytes.len() <= left;
+                left -= new_bytes.len();
+            }
+
+            if left == 0 {
+                return Some(move bytes);
+            } else {
+                warn!("flatpipe: dropped %? broken bytes", left);
+                return None;
+            }
+        }
+    }
+
+    pub impl<W: Writer> WriterByteChan<W>: ByteChan {
+        fn send(&self, val: ~[u8]) {
+            self.writer.write(val);
+        }
+    }
+
+    pub impl<R: Reader> ReaderBytePort<R> {
+        static fn new(r: R) -> ReaderBytePort<R> {
+            ReaderBytePort {
+                reader: move r
+            }
+        }
+    }
+
+    pub impl<W: Writer> WriterByteChan<W> {
+        static fn new(w: W) -> WriterByteChan<W> {
+            WriterByteChan {
+                writer: move w
+            }
+        }
+    }
+
+    pub struct PipeBytePort {
+        port: pipes::Port<~[u8]>,
+        mut buf: ~[u8]
+    }
+
+    pub struct PipeByteChan {
+        chan: pipes::Chan<~[u8]>
+    }
+
+    pub impl PipeBytePort: BytePort {
+        fn try_recv(&self, count: uint) -> Option<~[u8]> {
+            if self.buf.len() >= count {
+                let mut bytes = core::util::replace(&mut self.buf, ~[]);
+                self.buf = bytes.slice(count, bytes.len());
+                bytes.truncate(count);
+                return Some(bytes);
+            } else if self.buf.len() > 0 {
+                let mut bytes = core::util::replace(&mut self.buf, ~[]);
+                assert count > bytes.len();
+                match self.try_recv(count - bytes.len()) {
+                    Some(move rest) => {
+                        bytes.push_all(rest);
+                        return Some(move bytes);
+                    }
+                    None => return None
+                }
+            } else if self.buf.is_empty() {
+                match self.port.try_recv() {
+                    Some(move buf) => {
+                        assert buf.is_not_empty();
+                        self.buf = move buf;
+                        return self.try_recv(count);
+                    }
+                    None => return None
+                }
+            } else {
+                core::util::unreachable()
+            }
+        }
+    }
+
+    pub impl PipeByteChan: ByteChan {
+        fn send(&self, val: ~[u8]) {
+            self.chan.send(move val)
+        }
+    }
+
+    pub impl PipeBytePort {
+        static fn new(p: Port<~[u8]>) -> PipeBytePort {
+            PipeBytePort {
+                port: move p,
+                buf: ~[]
+            }
+        }
+    }
+
+    pub impl PipeByteChan {
+        static fn new(c: Chan<~[u8]>) -> PipeByteChan {
+            PipeByteChan {
+                chan: move c
+            }
+        }
+    }
+
+}
+
+// XXX: This belongs elsewhere
+mod util {
+
+    use io::{Reader, BytesReader};
+
+    pub struct BufReader {
+        buf: ~[u8],
+        mut pos: uint
+    }
+
+    pub impl BufReader {
+        static pub fn new(v: ~[u8]) -> BufReader {
+            BufReader {
+                buf: move v,
+                pos: 0
+            }
+        }
+
+        priv fn as_bytes_reader<A>(f: &fn(&BytesReader) -> A) -> A {
+            // Recreating the BytesReader state every call since
+            // I can't get the borrowing to work correctly
+            let bytes_reader = BytesReader {
+                bytes: core::util::id::<&[u8]>(self.buf),
+                pos: self.pos
+            };
+
+            let res = f(&bytes_reader);
+
+            // XXX: This isn't correct if f fails
+            self.pos = bytes_reader.pos;
+
+            return move res;
+        }
+    }
+
+    impl BufReader: Reader {
+        fn read(bytes: &[mut u8], len: uint) -> uint {
+            self.as_bytes_reader(|r| r.read(bytes, len) )
+        }
+        fn read_byte() -> int {
+            self.as_bytes_reader(|r| r.read_byte() )
+        }
+        fn eof() -> bool {
+            self.as_bytes_reader(|r| r.eof() )
+        }
+        fn seek(offset: int, whence: io::SeekStyle) {
+            self.as_bytes_reader(|r| r.seek(offset, whence) )
+        }
+        fn tell() -> uint {
+            self.as_bytes_reader(|r| r.tell() )
+        }
+    }
+
+}
+
+#[cfg(test)]
+mod test {
+
+    // XXX: json::Deserializer doesn't work because of problems related to
+    // its interior pointers
+    //use DefaultSerializer = json::Serializer;
+    //use DefaultDeserializer = json::Deserializer;
+    use DefaultSerializer = ebml::writer::Serializer;
+    use DefaultDeserializer = ebml::reader::Deserializer;
+
+    use flatpipes::flatteners::*;
+    use flatpipes::bytepipes::*;
+
+    use core::dvec::DVec;
+    use io::BytesReader;
+    use util::BufReader;
+    use net::tcp::TcpSocketBuf;
+
+    #[test]
+    fn test_serializing_memory_stream() {
+        let writer = BytesWriter();
+        let chan = serial::writer_chan(move writer);
+
+        chan.send(10);
+
+        let bytes = chan.byte_chan.writer.bytes.get();
+
+        let reader = BufReader::new(move bytes);
+        let port = serial::reader_port(move reader);
+
+        let res: int = port.recv();
+        assert res == 10i;
+    }
+
+    #[test]
+    fn test_serializing_pipes() {
+        let (port, chan) = serial::pipe_stream();
+
+        do task::spawn |move chan| {
+            for int::range(0, 10) |i| {
+                chan.send(i)
+            }
+        }
+
+        for int::range(0, 10) |i| {
+            assert i == port.recv()
+        }
+    }
+
+    #[test]
+    fn test_serializing_boxes() {
+        let (port, chan) = serial::pipe_stream();
+
+        do task::spawn |move chan| {
+            for int::range(0, 10) |i| {
+                chan.send(@i)
+            }
+        }
+
+        for int::range(0, 10) |i| {
+            assert @i == port.recv()
+        }
+    }
+
+    #[test]
+    fn test_pod_memory_stream() {
+        let writer = BytesWriter();
+        let chan = pod::writer_chan(move writer);
+
+        chan.send(10);
+
+        let bytes = chan.byte_chan.writer.bytes.get();
+
+        let reader = BufReader::new(move bytes);
+        let port = pod::reader_port(move reader);
+
+        let res: int = port.recv();
+        assert res == 10;
+    }
+
+    #[test]
+    fn test_pod_pipes() {
+        let (port, chan) = pod::pipe_stream();
+
+        do task::spawn |move chan| {
+            for int::range(0, 10) |i| {
+                chan.send(i)
+            }
+        }
+
+        for int::range(0, 10) |i| {
+            assert i == port.recv()
+        }
+    }
+
+    // XXX: Networking doesn't work on x86
+    #[test]
+    #[cfg(target_arch = "x86_64")]
+    fn test_pod_tcp_stream() {
+        fn reader_port(buf: TcpSocketBuf
+                      ) -> pod::ReaderPort<int, TcpSocketBuf> {
+            pod::reader_port(move buf)
+        }
+        fn writer_chan(buf: TcpSocketBuf
+                      ) -> pod::WriterChan<int, TcpSocketBuf> {
+            pod::writer_chan(move buf)
+        }
+        test_some_tcp_stream(reader_port, writer_chan, 9666);
+    }
+
+    #[test]
+    #[cfg(target_arch = "x86_64")]
+    fn test_serializing_tcp_stream() {
+        fn reader_port(buf: TcpSocketBuf
+                      ) -> serial::ReaderPort<int, TcpSocketBuf> {
+            serial::reader_port(move buf)
+        }
+        fn writer_chan(buf: TcpSocketBuf
+                      ) -> serial::WriterChan<int, TcpSocketBuf> {
+            serial::writer_chan(move buf)
+        }
+        test_some_tcp_stream(reader_port, writer_chan, 9667);
+    }
+
+    type ReaderPortFactory<U: Unflattener<int>> =
+        ~fn(TcpSocketBuf) -> FlatPort<int, U, ReaderBytePort<TcpSocketBuf>>;
+    type WriterChanFactory<F: Flattener<int>> =
+        ~fn(TcpSocketBuf) -> FlatChan<int, F, WriterByteChan<TcpSocketBuf>>;
+
+    fn test_some_tcp_stream<U: Unflattener<int>, F: Flattener<int>>(
+        reader_port: ReaderPortFactory<U>,
+        writer_chan: WriterChanFactory<F>,
+        port: uint) {
+
+        use net::tcp;
+        use net::ip;
+        use cell::Cell;
+        use net::tcp::TcpSocket;
+
+        // Indicate to the client task that the server is listening
+        let (begin_connect_port, begin_connect_chan) = pipes::stream();
+        // The connection is sent from the server task to the receiver task
+        // to handle the connection
+        let (accept_port, accept_chan) = pipes::stream();
+        // The main task will wait until the test is over to proceed
+        let (finish_port, finish_chan) = pipes::stream();
+
+        let addr = ip::v4::parse_addr("127.0.0.1");
+        let iotask = uv::global_loop::get();
+
+        let begin_connect_chan = Cell(move begin_connect_chan);
+        let accept_chan = Cell(move accept_chan);
+
+        // The server task
+        do task::spawn |copy addr, move begin_connect_chan,
+                        move accept_chan| {
+            let begin_connect_chan = begin_connect_chan.take();
+            let accept_chan = accept_chan.take();
+            let listen_res = do tcp::listen(
+                copy addr, port, 128, iotask,
+                |move begin_connect_chan, _kill_ch| {
+                    // Tell the sender to initiate the connection
+                    debug!("listening");
+                    begin_connect_chan.send(())
+                }) |move accept_chan, new_conn, kill_ch| {
+
+                // Incoming connection. Send it to the receiver task to accept
+                let (res_port, res_chan) = pipes::stream();
+                accept_chan.send((move new_conn, move res_chan));
+                // Wait until the connection is accepted
+                res_port.recv();
+
+                // Stop listening
+                kill_ch.send(None)
+            };
+
+            assert listen_res.is_ok();
+        }
+
+        // Client task
+        do task::spawn |copy addr, move begin_connect_port,
+                        move writer_chan| {
+
+            // Wait for the server to start listening
+            begin_connect_port.recv();
+
+            debug!("connecting");
+            let connect_result = tcp::connect(copy addr, port, iotask);
+            assert connect_result.is_ok();
+            let sock = result::unwrap(move connect_result);
+            let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(move sock);
+
+            // TcpSocketBuf is a Writer!
+            let chan = writer_chan(move socket_buf);
+
+            for int::range(0, 10) |i| {
+                debug!("sending %?", i);
+                chan.send(i)
+            }
+        }
+
+        // Reciever task
+        do task::spawn |move accept_port, move finish_chan,
+                        move reader_port| {
+
+            // Wait for a connection
+            let (conn, res_chan) = accept_port.recv();
+
+            debug!("accepting connection");
+            let accept_result = tcp::accept(conn);
+            debug!("accepted");
+            assert accept_result.is_ok();
+            let sock = result::unwrap(move accept_result);
+            res_chan.send(());
+
+            let socket_buf: tcp::TcpSocketBuf = tcp::socket_buf(move sock);
+
+            // TcpSocketBuf is a Reader!
+            let port = reader_port(move socket_buf);
+
+            for int::range(0, 10) |i| {
+                let j = port.recv();
+                debug!("receieved %?", j);
+                assert i == j;
+            }
+
+            // The test is over!
+            finish_chan.send(());
+        }
+
+        finish_port.recv();
+    }
+
+    // Tests that the different backends behave the same when the
+    // binary streaming protocol is broken
+    mod broken_protocol {
+        type PortLoader<P: BytePort> =
+            ~fn(~[u8]) -> FlatPort<int, PodUnflattener<int>, P>;
+
+        fn reader_port_loader(bytes: ~[u8]
+                             ) -> pod::ReaderPort<int, BufReader> {
+            let reader = BufReader::new(move bytes);
+            pod::reader_port(move reader)
+        }
+
+        fn pipe_port_loader(bytes: ~[u8]
+                           ) -> pod::PipePort<int> {
+            let (port, chan) = pipes::stream();
+            if bytes.is_not_empty() {
+                chan.send(move bytes);
+            }
+            pod::pipe_port(move port)
+        }
+
+        fn test_try_recv_none1<P: BytePort>(loader: PortLoader<P>) {
+            let bytes = ~[];
+            let port = loader(move bytes);
+            let res: Option<int> = port.try_recv();
+            assert res.is_none();
+        }
+
+        #[test]
+        fn test_try_recv_none1_reader() {
+            test_try_recv_none1(reader_port_loader);
+        }
+        #[test]
+        fn test_try_recv_none1_pipe() {
+            test_try_recv_none1(pipe_port_loader);
+        }
+
+        fn test_try_recv_none2<P: BytePort>(loader: PortLoader<P>) {
+            // The control word in the protocol is interrupted
+            let bytes = ~[0];
+            let port = loader(move bytes);
+            let res: Option<int> = port.try_recv();
+            assert res.is_none();
+        }
+
+        #[test]
+        fn test_try_recv_none2_reader() {
+            test_try_recv_none2(reader_port_loader);
+        }
+        #[test]
+        fn test_try_recv_none2_pipe() {
+            test_try_recv_none2(pipe_port_loader);
+        }
+
+        fn test_try_recv_none3<P: BytePort>(loader: PortLoader<P>) {
+            const CONTINUE: [u8 * 4] = [0xAA, 0xBB, 0xCC, 0xDD];
+            // The control word is followed by garbage
+            let bytes = CONTINUE.to_vec() + ~[0];
+            let port = loader(move bytes);
+            let res: Option<int> = port.try_recv();
+            assert res.is_none();
+        }
+
+        #[test]
+        fn test_try_recv_none3_reader() {
+            test_try_recv_none3(reader_port_loader);
+        }
+        #[test]
+        fn test_try_recv_none3_pipe() {
+            test_try_recv_none3(pipe_port_loader);
+        }
+
+        fn test_try_recv_none4<P: BytePort>(+loader: PortLoader<P>) {
+            assert do task::try |move loader| {
+                const CONTINUE: [u8 * 4] = [0xAA, 0xBB, 0xCC, 0xDD];
+                // The control word is followed by a valid length,
+                // then undeserializable garbage
+                let len_bytes = do io::u64_to_be_bytes(
+                    1, sys::size_of::<u64>()) |len_bytes| {
+                    len_bytes.to_vec()
+                };
+                let bytes = CONTINUE.to_vec() + len_bytes + ~[0, 0, 0, 0];
+
+                let port = loader(move bytes);
+
+                let _res: Option<int> = port.try_recv();
+            }.is_err();
+        }
+
+        #[test]
+        #[ignore(cfg(windows))]
+        fn test_try_recv_none4_reader() {
+            test_try_recv_none4(reader_port_loader);
+        }
+        #[test]
+        #[ignore(cfg(windows))]
+        fn test_try_recv_none4_pipe() {
+            test_try_recv_none4(pipe_port_loader);
+        }
+    }
+
+}
diff --git a/src/libstd/std.rc b/src/libstd/std.rc
index f5363ca23c5..5ea90253d80 100644
--- a/src/libstd/std.rc
+++ b/src/libstd/std.rc
@@ -65,6 +65,7 @@ pub mod arc;
 pub mod comm;
 pub mod future;
 pub mod task_pool;
+pub mod flatpipes;
 
 // Collections