diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-05-16 23:12:22 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-05-17 17:54:27 -0700 |
| commit | 26becc308e4b9a0f5be1c7c2895c7761b778e01f (patch) | |
| tree | 73fb276dad954cec037e43a72c2e75b8d7652645 | |
| parent | f5987b03b8d65a2b885519b7b9a0ea33cda33bc5 (diff) | |
| download | rust-26becc308e4b9a0f5be1c7c2895c7761b778e01f.tar.gz rust-26becc308e4b9a0f5be1c7c2895c7761b778e01f.zip | |
core: Wire up oneshot pipes to newsched
| -rw-r--r-- | src/libcore/comm.rs | 350 | ||||
| -rw-r--r-- | src/libcore/rt/local_services.rs | 22 | ||||
| -rw-r--r-- | src/libstd/future.rs | 10 | ||||
| -rw-r--r-- | src/libstd/workcache.rs | 7 |
4 files changed, 253 insertions, 136 deletions
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index 34c60202b3f..da3ae0e6c5d 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -22,6 +22,8 @@ use vec; use vec::OwnedVector; use util::replace; use unstable::sync::{Exclusive, exclusive}; +use rtcomm = rt::comm; +use rt; use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; @@ -335,180 +337,280 @@ impl<T: Owned> ::clone::Clone for SharedChan<T> { } } -/*proto! oneshot ( - Oneshot:send<T:Owned> { - send(T) -> ! +pub struct PortOne<T> { + inner: Either<pipesy::PortOne<T>, rtcomm::PortOne<T>> +} + +pub struct ChanOne<T> { + inner: Either<pipesy::ChanOne<T>, rtcomm::ChanOne<T>> +} + +pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { + let (port, chan) = match rt::context() { + rt::OldTaskContext => match pipesy::oneshot() { + (p, c) => (Left(p), Left(c)), + }, + _ => match rtcomm::oneshot() { + (p, c) => (Right(p), Right(c)) + } + }; + let port = PortOne { + inner: port + }; + let chan = ChanOne { + inner: chan + }; + return (port, chan); +} + +impl<T: Owned> PortOne<T> { + pub fn recv(self) -> T { + let PortOne { inner } = self; + match inner { + Left(p) => p.recv(), + Right(p) => p.recv() + } } -)*/ -#[allow(non_camel_case_types)] -pub mod oneshot { - priv use core::kinds::Owned; - use ptr::to_mut_unsafe_ptr; + pub fn try_recv(self) -> Option<T> { + let PortOne { inner } = self; + match inner { + Left(p) => p.try_recv(), + Right(p) => p.try_recv() + } + } +} - pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) { - pub use core::pipes::HasBuffer; +impl<T: Owned> ChanOne<T> { + pub fn send(self, data: T) { + let ChanOne { inner } = self; + match inner { + Left(p) => p.send(data), + Right(p) => p.send(data) + } + } - let buffer = ~::core::pipes::Buffer { - header: ::core::pipes::BufferHeader(), - data: __Buffer { - Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>() - }, - }; - do ::core::pipes::entangle_buffer(buffer) |buffer, data| { - data.Oneshot.set_buffer(buffer); - to_mut_unsafe_ptr(&mut data.Oneshot) + pub fn try_send(self, data: T) -> bool { + let ChanOne { inner } = self; + match inner { + Left(p) => p.try_send(data), + Right(p) => p.try_send(data) } } - #[allow(non_camel_case_types)] - pub enum Oneshot<T> { pub send(T), } - #[allow(non_camel_case_types)] - pub struct __Buffer<T> { - Oneshot: ::core::pipes::Packet<Oneshot<T>>, +} + +pub fn recv_one<T: Owned>(port: PortOne<T>) -> T { + let PortOne { inner } = port; + match inner { + Left(p) => pipesy::recv_one(p), + Right(p) => p.recv() } +} - #[allow(non_camel_case_types)] - pub mod client { +pub fn try_recv_one<T: Owned>(port: PortOne<T>) -> Option<T> { + let PortOne { inner } = port; + match inner { + Left(p) => pipesy::try_recv_one(p), + Right(p) => p.try_recv() + } +} + +pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) { + let ChanOne { inner } = chan; + match inner { + Left(c) => pipesy::send_one(c, data), + Right(c) => c.send(data) + } +} + +pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool { + let ChanOne { inner } = chan; + match inner { + Left(c) => pipesy::try_send_one(c, data), + Right(c) => c.try_send(data) + } +} +mod pipesy { + + use kinds::Owned; + use option::{Option, Some, None}; + use pipes::{recv, try_recv}; + + /*proto! oneshot ( + Oneshot:send<T:Owned> { + send(T) -> ! + } + )*/ + + #[allow(non_camel_case_types)] + pub mod oneshot { priv use core::kinds::Owned; + use ptr::to_mut_unsafe_ptr; - #[allow(non_camel_case_types)] - pub fn try_send<T: Owned>(pipe: Oneshot<T>, x_0: T) -> - ::core::option::Option<()> { - { - use super::send; - let message = send(x_0); - if ::core::pipes::send(pipe, message) { - ::core::pipes::rt::make_some(()) - } else { ::core::pipes::rt::make_none() } + pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) { + pub use core::pipes::HasBuffer; + + let buffer = ~::core::pipes::Buffer { + header: ::core::pipes::BufferHeader(), + data: __Buffer { + Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>() + }, + }; + do ::core::pipes::entangle_buffer(buffer) |buffer, data| { + data.Oneshot.set_buffer(buffer); + to_mut_unsafe_ptr(&mut data.Oneshot) } } + #[allow(non_camel_case_types)] + pub enum Oneshot<T> { pub send(T), } + #[allow(non_camel_case_types)] + pub struct __Buffer<T> { + Oneshot: ::core::pipes::Packet<Oneshot<T>>, + } #[allow(non_camel_case_types)] - pub fn send<T: Owned>(pipe: Oneshot<T>, x_0: T) { - { - use super::send; - let message = send(x_0); - ::core::pipes::send(pipe, message); + pub mod client { + + priv use core::kinds::Owned; + + #[allow(non_camel_case_types)] + pub fn try_send<T: Owned>(pipe: Oneshot<T>, x_0: T) -> + ::core::option::Option<()> { + { + use super::send; + let message = send(x_0); + if ::core::pipes::send(pipe, message) { + ::core::pipes::rt::make_some(()) + } else { ::core::pipes::rt::make_none() } + } } + + #[allow(non_camel_case_types)] + pub fn send<T: Owned>(pipe: Oneshot<T>, x_0: T) { + { + use super::send; + let message = send(x_0); + ::core::pipes::send(pipe, message); + } + } + + #[allow(non_camel_case_types)] + pub type Oneshot<T> = + ::core::pipes::SendPacketBuffered<super::Oneshot<T>, + super::__Buffer<T>>; } #[allow(non_camel_case_types)] - pub type Oneshot<T> = - ::core::pipes::SendPacketBuffered<super::Oneshot<T>, - super::__Buffer<T>>; + pub mod server { + #[allow(non_camel_case_types)] + pub type Oneshot<T> = + ::core::pipes::RecvPacketBuffered<super::Oneshot<T>, + super::__Buffer<T>>; + } } - #[allow(non_camel_case_types)] - pub mod server { - #[allow(non_camel_case_types)] - pub type Oneshot<T> = - ::core::pipes::RecvPacketBuffered<super::Oneshot<T>, - super::__Buffer<T>>; + /// The send end of a oneshot pipe. + pub struct ChanOne<T> { + contents: oneshot::client::Oneshot<T> } -} - -/// The send end of a oneshot pipe. -pub struct ChanOne<T> { - contents: oneshot::client::Oneshot<T> -} -impl<T> ChanOne<T> { - pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> { - ChanOne { - contents: contents + impl<T> ChanOne<T> { + pub fn new(contents: oneshot::client::Oneshot<T>) -> ChanOne<T> { + ChanOne { + contents: contents + } } } -} -/// The receive end of a oneshot pipe. -pub struct PortOne<T> { - contents: oneshot::server::Oneshot<T> -} + /// The receive end of a oneshot pipe. + pub struct PortOne<T> { + contents: oneshot::server::Oneshot<T> + } -impl<T> PortOne<T> { - pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> { - PortOne { - contents: contents + impl<T> PortOne<T> { + pub fn new(contents: oneshot::server::Oneshot<T>) -> PortOne<T> { + PortOne { + contents: contents + } } } -} -/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. -pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { - let (chan, port) = oneshot::init(); - (PortOne::new(port), ChanOne::new(chan)) -} + /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair. + pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) { + let (chan, port) = oneshot::init(); + (PortOne::new(port), ChanOne::new(chan)) + } -pub impl<T: Owned> PortOne<T> { - fn recv(self) -> T { recv_one(self) } - fn try_recv(self) -> Option<T> { try_recv_one(self) } - fn unwrap(self) -> oneshot::server::Oneshot<T> { - match self { - PortOne { contents: s } => s + pub impl<T: Owned> PortOne<T> { + fn recv(self) -> T { recv_one(self) } + fn try_recv(self) -> Option<T> { try_recv_one(self) } + fn unwrap(self) -> oneshot::server::Oneshot<T> { + match self { + PortOne { contents: s } => s + } } } -} -pub impl<T: Owned> ChanOne<T> { - fn send(self, data: T) { send_one(self, data) } - fn try_send(self, data: T) -> bool { try_send_one(self, data) } - fn unwrap(self) -> oneshot::client::Oneshot<T> { - match self { - ChanOne { contents: s } => s + pub impl<T: Owned> ChanOne<T> { + fn send(self, data: T) { send_one(self, data) } + fn try_send(self, data: T) -> bool { try_send_one(self, data) } + fn unwrap(self) -> oneshot::client::Oneshot<T> { + match self { + ChanOne { contents: s } => s + } } } -} -/** - * Receive a message from a oneshot pipe, failing if the connection was - * closed. - */ -pub fn recv_one<T: Owned>(port: PortOne<T>) -> T { - match port { - PortOne { contents: port } => { - let oneshot::send(message) = recv(port); - message + /** + * Receive a message from a oneshot pipe, failing if the connection was + * closed. + */ + pub fn recv_one<T: Owned>(port: PortOne<T>) -> T { + match port { + PortOne { contents: port } => { + let oneshot::send(message) = recv(port); + message + } } } -} -/// Receive a message from a oneshot pipe unless the connection was closed. -pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> { - match port { - PortOne { contents: port } => { - let message = try_recv(port); + /// Receive a message from a oneshot pipe unless the connection was closed. + pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> { + match port { + PortOne { contents: port } => { + let message = try_recv(port); - if message.is_none() { - None - } else { - let oneshot::send(message) = message.unwrap(); - Some(message) + if message.is_none() { + None + } else { + let oneshot::send(message) = message.unwrap(); + Some(message) + } } } } -} -/// Send a message on a oneshot pipe, failing if the connection was closed. -pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) { - match chan { - ChanOne { contents: chan } => oneshot::client::send(chan, data), + /// Send a message on a oneshot pipe, failing if the connection was closed. + pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) { + match chan { + ChanOne { contents: chan } => oneshot::client::send(chan, data), + } } -} -/** - * Send a message on a oneshot pipe, or return false if the connection was - * closed. - */ -pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool { - match chan { - ChanOne { contents: chan } => { - oneshot::client::try_send(chan, data).is_some() + /** + * Send a message on a oneshot pipe, or return false if the connection was + * closed. + */ + pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool { + match chan { + ChanOne { contents: chan } => { + oneshot::client::try_send(chan, data).is_some() + } } } -} - +} /// Returns the index of an endpoint that is ready to receive. pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint { diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs index 98bfc2fa168..35c703bb350 100644 --- a/src/libcore/rt/local_services.rs +++ b/src/libcore/rt/local_services.rs @@ -244,5 +244,27 @@ mod test { info!("here i am. logging in a newsched task"); } } + + #[test] + fn comm_oneshot() { + use comm::*; + + do run_in_newsched_task { + let (port, chan) = oneshot(); + send_one(chan, 10); + assert!(recv_one(port) == 10); + } + } + + #[test] + fn comm_stream() { + use comm::*; + + do run_in_newsched_task() { + let (port, chan) = oneshot(); + chan.send(10); + assert!(port.recv() == 10); + } + } } diff --git a/src/libstd/future.rs b/src/libstd/future.rs index be33c0f4663..b8ae03c0f2b 100644 --- a/src/libstd/future.rs +++ b/src/libstd/future.rs @@ -23,8 +23,7 @@ use core::cast; use core::cell::Cell; -use core::comm::{PortOne, oneshot, send_one}; -use core::pipes::recv; +use core::comm::{PortOne, oneshot, send_one, recv_one}; use core::task; use core::util::replace; @@ -105,11 +104,8 @@ pub fn from_port<A:Owned>(port: PortOne<A>) -> Future<A> { */ let port = Cell(port); - do from_fn || { - let port = port.take().unwrap(); - match recv(port) { - oneshot::send(data) => data - } + do from_fn { + recv_one(port.take()) } } diff --git a/src/libstd/workcache.rs b/src/libstd/workcache.rs index f173df60df8..3889650d012 100644 --- a/src/libstd/workcache.rs +++ b/src/libstd/workcache.rs @@ -15,11 +15,10 @@ use sort; use core::cell::Cell; use core::cmp; -use core::comm::{PortOne, oneshot, send_one}; +use core::comm::{PortOne, oneshot, send_one, recv_one}; use core::either::{Either, Left, Right}; use core::hashmap::HashMap; use core::io; -use core::pipes::recv; use core::run; use core::to_bytes; use core::util::replace; @@ -389,9 +388,7 @@ fn unwrap<T:Owned + None => fail!(), Some(Left(v)) => v, Some(Right(port)) => { - let (exe, v) = match recv(port.unwrap()) { - oneshot::send(data) => data - }; + let (exe, v) = recv_one(port); let s = json_encode(&v); |
