about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-05-16 23:12:22 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-17 17:54:27 -0700
commit26becc308e4b9a0f5be1c7c2895c7761b778e01f (patch)
tree73fb276dad954cec037e43a72c2e75b8d7652645
parentf5987b03b8d65a2b885519b7b9a0ea33cda33bc5 (diff)
downloadrust-26becc308e4b9a0f5be1c7c2895c7761b778e01f.tar.gz
rust-26becc308e4b9a0f5be1c7c2895c7761b778e01f.zip
core: Wire up oneshot pipes to newsched
-rw-r--r--src/libcore/comm.rs350
-rw-r--r--src/libcore/rt/local_services.rs22
-rw-r--r--src/libstd/future.rs10
-rw-r--r--src/libstd/workcache.rs7
4 files changed, 253 insertions, 136 deletions
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs
index 34c60202b3f..da3ae0e6c5d 100644
--- a/src/libcore/comm.rs
+++ b/src/libcore/comm.rs
@@ -22,6 +22,8 @@ use vec;
 use vec::OwnedVector;
 use util::replace;
 use unstable::sync::{Exclusive, exclusive};
+use rtcomm = rt::comm;
+use rt;
 
 use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
 
@@ -335,180 +337,280 @@ impl<T: Owned> ::clone::Clone for SharedChan<T> {
     }
 }
 
-/*proto! oneshot (
-    Oneshot:send<T:Owned> {
-        send(T) -> !
+pub struct PortOne<T> {
+    inner: Either<pipesy::PortOne<T>, rtcomm::PortOne<T>>
+}
+
+pub struct ChanOne<T> {
+    inner: Either<pipesy::ChanOne<T>, rtcomm::ChanOne<T>>
+}
+
+pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+    let (port, chan) = match rt::context() {
+        rt::OldTaskContext => match pipesy::oneshot() {
+            (p, c) => (Left(p), Left(c)),
+        },
+        _ => match rtcomm::oneshot() {
+            (p, c) => (Right(p), Right(c))
+        }
+    };
+    let port = PortOne {
+        inner: port
+    };
+    let chan = ChanOne {
+        inner: chan
+    };
+    return (port, chan);
+}
+
+impl<T: Owned> PortOne<T> {
+    pub fn recv(self) -> T {
+        let PortOne { inner } = self;
+        match inner {
+            Left(p) => p.recv(),
+            Right(p) => p.recv()
+        }
     }
-)*/
 
-#[allow(non_camel_case_types)]
-pub mod oneshot {
-    priv use core::kinds::Owned;
-    use ptr::to_mut_unsafe_ptr;
+    pub fn try_recv(self) -> Option<T> {
+        let PortOne { inner } = self;
+        match inner {
+            Left(p) => p.try_recv(),
+            Right(p) => p.try_recv()
+        }
+    }
+}
 
-    pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) {
-        pub use core::pipes::HasBuffer;
+impl<T: Owned> ChanOne<T> {
+    pub fn send(self, data: T) {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.send(data),
+            Right(p) => p.send(data)
+        }
+    }
 
-        let buffer = ~::core::pipes::Buffer {
-            header: ::core::pipes::BufferHeader(),
-            data: __Buffer {
-                Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>()
-            },
-        };
-        do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
-            data.Oneshot.set_buffer(buffer);
-            to_mut_unsafe_ptr(&mut data.Oneshot)
+    pub fn try_send(self, data: T) -> bool {
+        let ChanOne { inner } = self;
+        match inner {
+            Left(p) => p.try_send(data),
+            Right(p) => p.try_send(data)
         }
     }
-    #[allow(non_camel_case_types)]
-    pub enum Oneshot<T> { pub send(T), }
-    #[allow(non_camel_case_types)]
-    pub struct __Buffer<T> {
-        Oneshot: ::core::pipes::Packet<Oneshot<T>>,
+}
+
+pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
+    let PortOne { inner } = port;
+    match inner {
+        Left(p) => pipesy::recv_one(p),
+        Right(p) => p.recv()
     }
+}
 
-    #[allow(non_camel_case_types)]
-    pub mod client {
+pub fn try_recv_one<T: Owned>(port: PortOne<T>) -> Option<T> {
+    let PortOne { inner } = port;
+    match inner {
+        Left(p) => pipesy::try_recv_one(p),
+        Right(p) => p.try_recv()
+    }
+}
+
+pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
+    let ChanOne { inner } = chan;
+    match inner {
+        Left(c) => pipesy::send_one(c, data),
+        Right(c) => c.send(data)
+    }
+}
+
+pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool {
+    let ChanOne { inner } = chan;
+    match inner {
+        Left(c) => pipesy::try_send_one(c, data),
+        Right(c) => c.try_send(data)
+    }
+}
 
+mod pipesy {
+
+    use kinds::Owned;
+    use option::{Option, Some, None};
+    use pipes::{recv, try_recv};
+
+    /*proto! oneshot (
+        Oneshot:send<T:Owned> {
+            send(T) -> !
+        }
+    )*/
+
+    #[allow(non_camel_case_types)]
+    pub mod oneshot {
         priv use core::kinds::Owned;
+        use ptr::to_mut_unsafe_ptr;
 
-        #[allow(non_camel_case_types)]
-        pub fn try_send<T: Owned>(pipe: Oneshot<T>, x_0: T) ->
-            ::core::option::Option<()> {
-            {
-                use super::send;
-                let message = send(x_0);
-                if ::core::pipes::send(pipe, message) {
-                    ::core::pipes::rt::make_some(())
-                } else { ::core::pipes::rt::make_none() }
+        pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) {
+            pub use core::pipes::HasBuffer;
+
+            let buffer = ~::core::pipes::Buffer {
+                header: ::core::pipes::BufferHeader(),
+                data: __Buffer {
+                    Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>()
+                },
+            };
+            do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
+                data.Oneshot.set_buffer(buffer);
+                to_mut_unsafe_ptr(&mut data.Oneshot)
             }
         }
+        #[allow(non_camel_case_types)]
+        pub enum Oneshot<T> { pub send(T), }
+        #[allow(non_camel_case_types)]
+        pub struct __Buffer<T> {
+            Oneshot: ::core::pipes::Packet<Oneshot<T>>,
+        }
 
         #[allow(non_camel_case_types)]
-        pub fn send<T: Owned>(pipe: Oneshot<T>, x_0: T) {
-            {
-                use super::send;
-                let message = send(x_0);
-                ::core::pipes::send(pipe, message);
+        pub mod client {
+
+            priv use core::kinds::Owned;
+
+            #[allow(non_camel_case_types)]
+            pub fn try_send<T: Owned>(pipe: Oneshot<T>, x_0: T) ->
+                ::core::option::Option<()> {
+                {
+                    use super::send;
+                    let message = send(x_0);
+                    if ::core::pipes::send(pipe, message) {
+                        ::core::pipes::rt::make_some(())
+                    } else { ::core::pipes::rt::make_none() }
+                }
             }
+
+            #[allow(non_camel_case_types)]
+            pub fn send<T: Owned>(pipe: Oneshot<T>, x_0: T) {
+                {
+                    use super::send;
+                    let message = send(x_0);
+                    ::core::pipes::send(pipe, message);
+                }
+            }
+
+            #[allow(non_camel_case_types)]
+            pub type Oneshot<T> =
+                ::core::pipes::SendPacketBuffered<super::Oneshot<T>,
+            super::__Buffer<T>>;
         }
 
         #[allow(non_camel_case_types)]
-        pub type Oneshot<T> =
-            ::core::pipes::SendPacketBuffered<super::Oneshot<T>,
-                                              super::__Buffer<T>>;
+        pub mod server {
+            #[allow(non_camel_case_types)]
+            pub type Oneshot<T> =
+                ::core::pipes::RecvPacketBuffered<super::Oneshot<T>,
+            super::__Buffer<T>>;
+        }
     }
 
-    #[allow(non_camel_case_types)]
-    pub mod server {
-        #[allow(non_camel_case_types)]
-        pub type Oneshot<T> =
-            ::core::pipes::RecvPacketBuffered<super::Oneshot<T>,
-                                              super::__Buffer<T>>;
+    /// The send end of a oneshot pipe.
+    pub struct ChanOne<T> {
+        contents: oneshot::client::Oneshot<T>
     }
-}
-
-/// The send end of a oneshot pipe.
-pub struct ChanOne<T> {
-    contents: oneshot::client::Oneshot<T>
-}
 
-impl<T> ChanOne<T> {
-    pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> {
-        ChanOne {
-            contents: contents
+    impl<T> ChanOne<T> {
+        pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> {
+            ChanOne {
+                contents: contents
+            }
         }
     }
-}
 
-/// The receive end of a oneshot pipe.
-pub struct PortOne<T> {
-    contents: oneshot::server::Oneshot<T>
-}
+    /// The receive end of a oneshot pipe.
+    pub struct PortOne<T> {
+        contents: oneshot::server::Oneshot<T>
+    }
 
-impl<T> PortOne<T> {
-    pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> {
-        PortOne {
-            contents: contents
+    impl<T> PortOne<T> {
+        pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> {
+            PortOne {
+                contents: contents
+            }
         }
     }
-}
 
-/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
-pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
-    let (chan, port) = oneshot::init();
-    (PortOne::new(port), ChanOne::new(chan))
-}
+    /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
+    pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+        let (chan, port) = oneshot::init();
+        (PortOne::new(port), ChanOne::new(chan))
+    }
 
-pub impl<T: Owned> PortOne<T> {
-    fn recv(self) -> T { recv_one(self) }
-    fn try_recv(self) -> Option<T> { try_recv_one(self) }
-    fn unwrap(self) -> oneshot::server::Oneshot<T> {
-        match self {
-            PortOne { contents: s } => s
+    pub impl<T: Owned> PortOne<T> {
+        fn recv(self) -> T { recv_one(self) }
+        fn try_recv(self) -> Option<T> { try_recv_one(self) }
+        fn unwrap(self) -> oneshot::server::Oneshot<T> {
+            match self {
+                PortOne { contents: s } => s
+            }
         }
     }
-}
 
-pub impl<T: Owned> ChanOne<T> {
-    fn send(self, data: T) { send_one(self, data) }
-    fn try_send(self, data: T) -> bool { try_send_one(self, data) }
-    fn unwrap(self) -> oneshot::client::Oneshot<T> {
-        match self {
-            ChanOne { contents: s } => s
+    pub impl<T: Owned> ChanOne<T> {
+        fn send(self, data: T) { send_one(self, data) }
+        fn try_send(self, data: T) -> bool { try_send_one(self, data) }
+        fn unwrap(self) -> oneshot::client::Oneshot<T> {
+            match self {
+                ChanOne { contents: s } => s
+            }
         }
     }
-}
 
-/**
- * Receive a message from a oneshot pipe, failing if the connection was
- * closed.
- */
-pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
-    match port {
-        PortOne { contents: port } => {
-            let oneshot::send(message) = recv(port);
-            message
+    /**
+    * Receive a message from a oneshot pipe, failing if the connection was
+    * closed.
+    */
+    pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
+        match port {
+            PortOne { contents: port } => {
+                let oneshot::send(message) = recv(port);
+                message
+            }
         }
     }
-}
 
-/// Receive a message from a oneshot pipe unless the connection was closed.
-pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> {
-    match port {
-        PortOne { contents: port } => {
-            let message = try_recv(port);
+    /// Receive a message from a oneshot pipe unless the connection was closed.
+    pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> {
+        match port {
+            PortOne { contents: port } => {
+                let message = try_recv(port);
 
-            if message.is_none() {
-                None
-            } else {
-                let oneshot::send(message) = message.unwrap();
-                Some(message)
+                if message.is_none() {
+                    None
+                } else {
+                    let oneshot::send(message) = message.unwrap();
+                    Some(message)
+                }
             }
         }
     }
-}
 
-/// Send a message on a oneshot pipe, failing if the connection was closed.
-pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
-    match chan {
-        ChanOne { contents: chan } => oneshot::client::send(chan, data),
+    /// Send a message on a oneshot pipe, failing if the connection was closed.
+    pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
+        match chan {
+            ChanOne { contents: chan } => oneshot::client::send(chan, data),
+        }
     }
-}
 
-/**
- * Send a message on a oneshot pipe, or return false if the connection was
- * closed.
- */
-pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool {
-    match chan {
-        ChanOne { contents: chan } => {
-            oneshot::client::try_send(chan, data).is_some()
+    /**
+    * Send a message on a oneshot pipe, or return false if the connection was
+    * closed.
+    */
+    pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool {
+        match chan {
+            ChanOne { contents: chan } => {
+                oneshot::client::try_send(chan, data).is_some()
+            }
         }
     }
-}
-
 
+}
 
 /// Returns the index of an endpoint that is ready to receive.
 pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint {
diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs
index 98bfc2fa168..35c703bb350 100644
--- a/src/libcore/rt/local_services.rs
+++ b/src/libcore/rt/local_services.rs
@@ -244,5 +244,27 @@ mod test {
             info!("here i am. logging in a newsched task");
         }
     }
+
+    #[test]
+    fn comm_oneshot() {
+        use comm::*;
+
+        do run_in_newsched_task {
+            let (port, chan) = oneshot();
+            send_one(chan, 10);
+            assert!(recv_one(port) == 10);
+        }
+    }
+
+    #[test]
+    fn comm_stream() {
+        use comm::*;
+
+        do run_in_newsched_task() {
+            let (port, chan) = oneshot();
+            chan.send(10);
+            assert!(port.recv() == 10);
+        }
+    }
 }
 
diff --git a/src/libstd/future.rs b/src/libstd/future.rs
index be33c0f4663..b8ae03c0f2b 100644
--- a/src/libstd/future.rs
+++ b/src/libstd/future.rs
@@ -23,8 +23,7 @@
 
 use core::cast;
 use core::cell::Cell;
-use core::comm::{PortOne, oneshot, send_one};
-use core::pipes::recv;
+use core::comm::{PortOne, oneshot, send_one, recv_one};
 use core::task;
 use core::util::replace;
 
@@ -105,11 +104,8 @@ pub fn from_port<A:Owned>(port: PortOne<A>) -> Future<A> {
      */
 
     let port = Cell(port);
-    do from_fn || {
-        let port = port.take().unwrap();
-        match recv(port) {
-            oneshot::send(data) => data
-        }
+    do from_fn {
+        recv_one(port.take())
     }
 }
 
diff --git a/src/libstd/workcache.rs b/src/libstd/workcache.rs
index f173df60df8..3889650d012 100644
--- a/src/libstd/workcache.rs
+++ b/src/libstd/workcache.rs
@@ -15,11 +15,10 @@ use sort;
 
 use core::cell::Cell;
 use core::cmp;
-use core::comm::{PortOne, oneshot, send_one};
+use core::comm::{PortOne, oneshot, send_one, recv_one};
 use core::either::{Either, Left, Right};
 use core::hashmap::HashMap;
 use core::io;
-use core::pipes::recv;
 use core::run;
 use core::to_bytes;
 use core::util::replace;
@@ -389,9 +388,7 @@ fn unwrap<T:Owned +
         None => fail!(),
         Some(Left(v)) => v,
         Some(Right(port)) => {
-            let (exe, v) = match recv(port.unwrap()) {
-                oneshot::send(data) => data
-            };
+            let (exe, v) = recv_one(port);
 
             let s = json_encode(&v);