about summary refs log tree commit diff
path: root/src/libcore
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcore')
-rw-r--r--src/libcore/hashmap.rs6
-rw-r--r--src/libcore/io.rs2
-rw-r--r--src/libcore/oldcomm.rs8
-rw-r--r--src/libcore/pipes.rs294
-rw-r--r--src/libcore/private.rs6
-rw-r--r--src/libcore/private/global.rs2
-rw-r--r--src/libcore/reflect.rs2
-rw-r--r--src/libcore/task/local_data.rs2
-rw-r--r--src/libcore/task/spawn.rs2
9 files changed, 304 insertions, 20 deletions
diff --git a/src/libcore/hashmap.rs b/src/libcore/hashmap.rs
index df98e469bbc..3a51a2a212c 100644
--- a/src/libcore/hashmap.rs
+++ b/src/libcore/hashmap.rs
@@ -35,13 +35,13 @@ pub mod linear {
 
     const INITIAL_CAPACITY: uint = 32u; // 2^5
 
-    struct Bucket<K: Eq Hash, V> {
+    struct Bucket<K,V> {
         hash: uint,
         key: K,
         value: V,
     }
 
-    pub struct LinearMap<K: Eq Hash, V> {
+    pub struct LinearMap<K,V> {
         k0: u64,
         k1: u64,
         resize_at: uint,
@@ -408,7 +408,7 @@ pub mod linear {
         pure fn ne(&self, other: &LinearMap<K, V>) -> bool { !self.eq(other) }
     }
 
-    pub struct LinearSet<T: Hash IterBytes Eq> {
+    pub struct LinearSet<T> {
         priv map: LinearMap<T, ()>
     }
 
diff --git a/src/libcore/io.rs b/src/libcore/io.rs
index cf49ee0becc..6d618627ece 100644
--- a/src/libcore/io.rs
+++ b/src/libcore/io.rs
@@ -1111,7 +1111,7 @@ pub mod fsync {
 
 
     // Artifacts that need to fsync on destruction
-    pub struct Res<t: Copy> {
+    pub struct Res<t> {
         arg: Arg<t>,
     }
 
diff --git a/src/libcore/oldcomm.rs b/src/libcore/oldcomm.rs
index 89300be9ab4..dc245f5bffd 100644
--- a/src/libcore/oldcomm.rs
+++ b/src/libcore/oldcomm.rs
@@ -68,7 +68,7 @@ use vec;
  * transmitted. If a port value is copied, both copies refer to the same
  * port.  Ports may be associated with multiple `chan`s.
  */
-pub enum Port<T: Owned> {
+pub enum Port<T> {
     Port_(@PortPtr<T>)
 }
 
@@ -84,7 +84,7 @@ pub enum Port<T: Owned> {
  * data will be silently dropped.  Channels may be duplicated and
  * themselves transmitted over other channels.
  */
-pub enum Chan<T: Owned> {
+pub enum Chan<T> {
     Chan_(port_id)
 }
 
@@ -120,7 +120,7 @@ pub fn listen<T: Owned, U>(f: fn(Chan<T>) -> U) -> U {
     f(po.chan())
 }
 
-struct PortPtr<T:Owned> {
+struct PortPtr<T> {
   po: *rust_port,
   drop {
     unsafe {
@@ -238,7 +238,7 @@ fn peek_chan<T: Owned>(ch: Chan<T>) -> bool {
 }
 
 /// Receive on a raw port pointer
-fn recv_<T: Owned>(p: *rust_port) -> T {
+fn recv_<T>(p: *rust_port) -> T {
     unsafe {
         let yield = 0;
         let yieldp = ptr::addr_of(&yield);
diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs
index 33826d10c3c..5af9950ff08 100644
--- a/src/libcore/pipes.rs
+++ b/src/libcore/pipes.rs
@@ -151,7 +151,7 @@ type Buffer<T: Owned> = {
 #[cfg(stage1)]
 #[cfg(stage2)]
 #[cfg(stage3)]
-pub struct Buffer<T: Owned> {
+pub struct Buffer<T> {
     header: BufferHeader,
     data: T,
 }
@@ -212,10 +212,18 @@ impl PacketHeader {
 }
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub struct Packet<T: Owned> {
     header: PacketHeader,
     mut payload: Option<T>,
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub struct Packet<T> {
+    header: PacketHeader,
+    mut payload: Option<T>,
+}
 
 #[doc(hidden)]
 pub trait HasBuffer {
@@ -256,12 +264,11 @@ fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> {
     }
     move b
 }
-
 #[doc(hidden)]
 #[cfg(stage1)]
 #[cfg(stage2)]
 #[cfg(stage3)]
-fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> {
+fn unibuffer<T>() -> ~Buffer<Packet<T>> {
     let b = ~Buffer {
         header: BufferHeader(),
         data: Packet {
@@ -277,6 +284,7 @@ fn unibuffer<T: Owned>() -> ~Buffer<Packet<T>> {
 }
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub fn packet<T: Owned>() -> *Packet<T> {
     let b = unibuffer();
     let p = ptr::addr_of(&(b.data));
@@ -284,6 +292,16 @@ pub fn packet<T: Owned>() -> *Packet<T> {
     unsafe { forget(move b) }
     p
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn packet<T>() -> *Packet<T> {
+    let b = unibuffer();
+    let p = ptr::addr_of(&(b.data));
+    // We'll take over memory management from here.
+    unsafe { forget(move b) }
+    p
+}
 
 #[doc(hidden)]
 pub fn entangle_buffer<T: Owned, Tstart: Owned>(
@@ -387,11 +405,19 @@ fn swap_state_rel(dst: &mut State, src: State) -> State {
 }
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub unsafe fn get_buffer<T: Owned>(p: *PacketHeader) -> ~Buffer<T> {
     transmute((*p).buf_header())
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> {
+    transmute((*p).buf_header())
+}
 
 // This could probably be done with SharedMutableState to avoid move_it!().
+#[cfg(stage0)]
 struct BufferResource<T: Owned> {
     buffer: ~Buffer<T>,
 
@@ -413,7 +439,31 @@ struct BufferResource<T: Owned> {
         }
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+struct BufferResource<T> {
+    buffer: ~Buffer<T>,
 
+    drop {
+        unsafe {
+            let b = move_it!(self.buffer);
+            //let p = ptr::addr_of(*b);
+            //error!("drop %?", p);
+            let old_count = atomic_sub_rel(&mut b.header.ref_count, 1);
+            //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
+            if old_count == 1 {
+                // The new count is 0.
+
+                // go go gadget drop glue
+            }
+            else {
+                forget(move b)
+            }
+        }
+    }
+}
+
+#[cfg(stage0)]
 fn BufferResource<T: Owned>(b: ~Buffer<T>) -> BufferResource<T> {
     //let p = ptr::addr_of(*b);
     //error!("take %?", p);
@@ -424,8 +474,21 @@ fn BufferResource<T: Owned>(b: ~Buffer<T>) -> BufferResource<T> {
         buffer: move b
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> {
+    //let p = ptr::addr_of(*b);
+    //error!("take %?", p);
+    atomic_add_acq(&mut b.header.ref_count, 1);
+
+    BufferResource {
+        // tjc: ????
+        buffer: move b
+    }
+}
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub fn send<T: Owned, Tbuffer: Owned>(p: SendPacketBuffered<T, Tbuffer>,
                                     payload: T) -> bool {
     let header = p.header();
@@ -467,6 +530,49 @@ pub fn send<T: Owned, Tbuffer: Owned>(p: SendPacketBuffered<T, Tbuffer>,
         }
     }
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool {
+    let header = p.header();
+    let p_ = p.unwrap();
+    let p = unsafe { &*p_ };
+    assert ptr::addr_of(&(p.header)) == header;
+    assert p.payload.is_none();
+    p.payload = move Some(move payload);
+    let old_state = swap_state_rel(&mut p.header.state, Full);
+    match old_state {
+        Empty => {
+            // Yay, fastpath.
+
+            // The receiver will eventually clean this up.
+            //unsafe { forget(p); }
+            return true;
+        }
+        Full => fail ~"duplicate send",
+        Blocked => {
+            debug!("waking up task for %?", p_);
+            let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
+            if !old_task.is_null() {
+                unsafe {
+                    rustrt::task_signal_event(
+                        old_task,
+                        ptr::addr_of(&(p.header)) as *libc::c_void);
+                    rustrt::rust_task_deref(old_task);
+                }
+            }
+
+            // The receiver will eventually clean this up.
+            //unsafe { forget(p); }
+            return true;
+        }
+        Terminated => {
+            // The receiver will never receive this. Rely on drop_glue
+            // to clean everything up.
+            return false;
+        }
+    }
+}
 
 /** Receives a message from a pipe.
 
@@ -812,13 +918,24 @@ pub fn select<T: Owned, Tb: Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>])
 message.
 
 */
+#[cfg(stage0)]
 pub type SendPacket<T: Owned> = SendPacketBuffered<T, Packet<T>>;
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub fn SendPacket<T: Owned>(p: *Packet<T>) -> SendPacket<T> {
     SendPacketBuffered(p)
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> {
+    SendPacketBuffered(p)
+}
 
+#[cfg(stage0)]
 pub struct SendPacketBuffered<T: Owned, Tbuffer: Owned> {
     mut p: Option<*Packet<T>>,
     mut buffer: Option<BufferResource<Tbuffer>>,
@@ -837,7 +954,31 @@ pub struct SendPacketBuffered<T: Owned, Tbuffer: Owned> {
         //                } else { "some" }); }
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub struct SendPacketBuffered<T, Tbuffer> {
+    mut p: Option<*Packet<T>>,
+    mut buffer: Option<BufferResource<Tbuffer>>,
+}
 
+impl<T:Owned,Tbuffer:Owned> SendPacketBuffered<T,Tbuffer> : ::ops::Drop {
+    fn finalize(&self) {
+        //if self.p != none {
+        //    debug!("drop send %?", option::get(self.p));
+        //}
+        if self.p != None {
+            let mut p = None;
+            p <-> self.p;
+            sender_terminate(option::unwrap(move p))
+        }
+        //unsafe { error!("send_drop: %?",
+        //                if self.buffer == none {
+        //                    "none"
+        //                } else { "some" }); }
+    }
+}
+
+#[cfg(stage0)]
 pub fn SendPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>)
     -> SendPacketBuffered<T, Tbuffer> {
         //debug!("take send %?", p);
@@ -849,8 +990,50 @@ pub fn SendPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>)
         }
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>)
+    -> SendPacketBuffered<T, Tbuffer> {
+        //debug!("take send %?", p);
+    SendPacketBuffered {
+        p: Some(p),
+        buffer: unsafe {
+            Some(BufferResource(
+                get_buffer(ptr::addr_of(&((*p).header)))))
+        }
+    }
+}
+
+#[cfg(stage0)]
+impl<T:Owned,Tbuffer:Owned> SendPacketBuffered<T,Tbuffer> {
+    fn unwrap() -> *Packet<T> {
+        let mut p = None;
+        p <-> self.p;
+        option::unwrap(move p)
+    }
+
+    pure fn header() -> *PacketHeader {
+        match self.p {
+          Some(packet) => unsafe {
+            let packet = &*packet;
+            let header = ptr::addr_of(&(packet.header));
+            //forget(packet);
+            header
+          },
+          None => fail ~"packet already consumed"
+        }
+    }
 
-impl<T: Owned, Tbuffer: Owned> SendPacketBuffered<T, Tbuffer> {
+    fn reuse_buffer() -> BufferResource<Tbuffer> {
+        //error!("send reuse_buffer");
+        let mut tmp = None;
+        tmp <-> self.buffer;
+        option::unwrap(move tmp)
+    }
+}
+#[cfg(stage1)]
+#[cfg(stage2)]
+impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
     fn unwrap() -> *Packet<T> {
         let mut p = None;
         p <-> self.p;
@@ -879,13 +1062,25 @@ impl<T: Owned, Tbuffer: Owned> SendPacketBuffered<T, Tbuffer> {
 
 /// Represents the receive end of a pipe. It can receive exactly one
 /// message.
+#[cfg(stage0)]
 pub type RecvPacket<T: Owned> = RecvPacketBuffered<T, Packet<T>>;
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub fn RecvPacket<T: Owned>(p: *Packet<T>) -> RecvPacket<T> {
     RecvPacketBuffered(p)
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> {
+    RecvPacketBuffered(p)
+}
 
+#[cfg(stage0)]
 pub struct RecvPacketBuffered<T: Owned, Tbuffer: Owned> {
     mut p: Option<*Packet<T>>,
     mut buffer: Option<BufferResource<Tbuffer>>,
@@ -904,6 +1099,29 @@ pub struct RecvPacketBuffered<T: Owned, Tbuffer: Owned> {
         //                } else { "some" }); }
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub struct RecvPacketBuffered<T, Tbuffer> {
+    mut p: Option<*Packet<T>>,
+    mut buffer: Option<BufferResource<Tbuffer>>,
+}
+
+impl<T:Owned, Tbuffer:Owned> RecvPacketBuffered<T,Tbuffer> : ::ops::Drop {
+    fn finalize(&self) {
+        //if self.p != none {
+        //    debug!("drop recv %?", option::get(self.p));
+        //}
+        if self.p != None {
+            let mut p = None;
+            p <-> self.p;
+            receiver_terminate(option::unwrap(move p))
+        }
+        //unsafe { error!("recv_drop: %?",
+        //                if self.buffer == none {
+        //                    "none"
+        //                } else { "some" }); }
+    }
+}
 
 impl<T: Owned, Tbuffer: Owned> RecvPacketBuffered<T, Tbuffer> {
     fn unwrap() -> *Packet<T> {
@@ -934,6 +1152,7 @@ impl<T: Owned, Tbuffer: Owned> RecvPacketBuffered<T, Tbuffer> : Selectable {
     }
 }
 
+#[cfg(stage0)]
 pub fn RecvPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>)
     -> RecvPacketBuffered<T, Tbuffer> {
     //debug!("take recv %?", p);
@@ -945,12 +1164,33 @@ pub fn RecvPacketBuffered<T: Owned, Tbuffer: Owned>(p: *Packet<T>)
         }
     }
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>)
+    -> RecvPacketBuffered<T,Tbuffer> {
+    //debug!("take recv %?", p);
+    RecvPacketBuffered {
+        p: Some(p),
+        buffer: unsafe {
+            Some(BufferResource(
+                get_buffer(ptr::addr_of(&((*p).header)))))
+        }
+    }
+}
 
 #[doc(hidden)]
+#[cfg(stage0)]
 pub fn entangle<T: Owned>() -> (SendPacket<T>, RecvPacket<T>) {
     let p = packet();
     (SendPacket(p), RecvPacket(p))
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) {
+    let p = packet();
+    (SendPacket(p), RecvPacket(p))
+}
 
 /** Spawn a task to provide a service.
 
@@ -1042,24 +1282,50 @@ pub trait Peekable<T> {
 }
 
 #[doc(hidden)]
+#[cfg(stage0)]
 struct Chan_<T:Owned> {
-    mut endp: Option<streamp::client::Open<T>>,
+    mut endp: Option<streamp::client::Open<T>>
+}
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+struct Chan_<T> {
+    mut endp: Option<streamp::client::Open<T>>
 }
 
 /// An endpoint that can send many messages.
+#[cfg(stage0)]
 pub enum Chan<T:Owned> {
     Chan_(Chan_<T>)
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub enum Chan<T> {
+    Chan_(Chan_<T>)
+}
 
 #[doc(hidden)]
+#[cfg(stage0)]
 struct Port_<T:Owned> {
     mut endp: Option<streamp::server::Open<T>>,
 }
+#[doc(hidden)]
+#[cfg(stage1)]
+#[cfg(stage2)]
+struct Port_<T> {
+    mut endp: Option<streamp::server::Open<T>>,
+}
 
 /// An endpoint that can receive many messages.
+#[cfg(stage0)]
 pub enum Port<T:Owned> {
     Port_(Port_<T>)
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub enum Port<T> {
+    Port_(Port_<T>)
+}
 
 /** Creates a `(chan, port)` pair.
 
@@ -1145,9 +1411,15 @@ impl<T: Owned> Port<T>: Selectable {
 }
 
 /// Treat many ports as one.
+#[cfg(stage0)]
 pub struct PortSet<T: Owned> {
     mut ports: ~[pipes::Port<T>],
 }
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub struct PortSet<T> {
+    mut ports: ~[pipes::Port<T>],
+}
 
 pub fn PortSet<T: Owned>() -> PortSet<T>{
     PortSet {
@@ -1210,7 +1482,11 @@ impl<T: Owned> PortSet<T> : Peekable<T> {
 }
 
 /// A channel that can be shared between many senders.
+#[cfg(stage0)]
 pub type SharedChan<T: Owned> = private::Exclusive<Chan<T>>;
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub type SharedChan<T> = private::Exclusive<Chan<T>>;
 
 impl<T: Owned> SharedChan<T>: GenericChan<T> {
     fn send(x: T) {
@@ -1278,9 +1554,17 @@ proto! oneshot (
 )
 
 /// The send end of a oneshot pipe.
+#[cfg(stage0)]
 pub type ChanOne<T: Owned> = oneshot::client::Oneshot<T>;
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub type ChanOne<T> = oneshot::client::Oneshot<T>;
 /// The receive end of a oneshot pipe.
+#[cfg(stage0)]
 pub type PortOne<T: Owned> = oneshot::server::Oneshot<T>;
+#[cfg(stage1)]
+#[cfg(stage2)]
+pub type PortOne<T> = oneshot::server::Oneshot<T>;
 
 /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
 pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
diff --git a/src/libcore/private.rs b/src/libcore/private.rs
index 332c763f151..b6ac711d764 100644
--- a/src/libcore/private.rs
+++ b/src/libcore/private.rs
@@ -238,7 +238,7 @@ pub unsafe fn unwrap_shared_mutable_state<T: Owned>(rc: SharedMutableState<T>)
  * Data races between tasks can result in crashes and, with sufficient
  * cleverness, arbitrary type coercion.
  */
-pub type SharedMutableState<T: Owned> = ArcDestruct<T>;
+pub type SharedMutableState<T> = ArcDestruct<T>;
 
 pub unsafe fn shared_mutable_state<T: Owned>(data: T) ->
         SharedMutableState<T> {
@@ -341,11 +341,11 @@ impl LittleLock {
     }
 }
 
-struct ExData<T: Owned> { lock: LittleLock, mut failed: bool, mut data: T, }
+struct ExData<T> { lock: LittleLock, mut failed: bool, mut data: T, }
 /**
  * An arc over mutable data that is protected by a lock. For library use only.
  */
-pub struct Exclusive<T: Owned> { x: SharedMutableState<ExData<T>> }
+pub struct Exclusive<T> { x: SharedMutableState<ExData<T>> }
 
 pub fn exclusive<T:Owned >(user_data: T) -> Exclusive<T> {
     let data = ExData {
diff --git a/src/libcore/private/global.rs b/src/libcore/private/global.rs
index 69319abc009..ee20fb665be 100644
--- a/src/libcore/private/global.rs
+++ b/src/libcore/private/global.rs
@@ -41,7 +41,7 @@ use sys::Closure;
 use task::spawn;
 use uint;
 
-pub type GlobalDataKey<T: Owned> = &fn(v: T);
+pub type GlobalDataKey<T> = &fn(v: T);
 
 pub unsafe fn global_data_clone_create<T: Owned Clone>(
     key: GlobalDataKey<T>, create: &fn() -> ~T) -> T {
diff --git a/src/libcore/reflect.rs b/src/libcore/reflect.rs
index 55eb53bc026..81a36e1ae13 100644
--- a/src/libcore/reflect.rs
+++ b/src/libcore/reflect.rs
@@ -41,7 +41,7 @@ pub fn align(size: uint, align: uint) -> uint {
 }
 
 /// Adaptor to wrap around visitors implementing MovePtr.
-pub struct MovePtrAdaptor<V: TyVisitor MovePtr> {
+pub struct MovePtrAdaptor<V> {
     inner: V
 }
 pub fn MovePtrAdaptor<V: TyVisitor MovePtr>(v: V) -> MovePtrAdaptor<V> {
diff --git a/src/libcore/task/local_data.rs b/src/libcore/task/local_data.rs
index 05a4e35b249..42765ef139f 100644
--- a/src/libcore/task/local_data.rs
+++ b/src/libcore/task/local_data.rs
@@ -45,7 +45,7 @@ use task;
  *
  * These two cases aside, the interface is safe.
  */
-pub type LocalDataKey<T: Durable> = &fn(v: @T);
+pub type LocalDataKey<T> = &fn(v: @T);
 
 /**
  * Remove a task-local data value from the table, returning the
diff --git a/src/libcore/task/spawn.rs b/src/libcore/task/spawn.rs
index 0a2f6634214..3db6fa00f16 100644
--- a/src/libcore/task/spawn.rs
+++ b/src/libcore/task/spawn.rs
@@ -77,7 +77,7 @@ use cast;
 use container::Map;
 use oldcomm;
 use option;
-use pipes::{Chan, GenericChan, GenericPort, Port};
+use pipes::{Chan, GenericChan, GenericPort, Port, stream};
 use pipes;
 use prelude::*;
 use private;