diff options
| author | Ben Blum <bblum@andrew.cmu.edu> | 2013-08-01 21:57:15 -0400 |
|---|---|---|
| committer | Ben Blum <bblum@andrew.cmu.edu> | 2013-08-02 17:31:44 -0400 |
| commit | be7738bfa18989438e3597847cd6a7f3bbbfac12 (patch) | |
| tree | 0076916ccef19cffc68622602477c9aa3df11e41 /src/libstd | |
| parent | f1c1f92d0c555d6e38ad1cac55926d6d9c9b090f (diff) | |
| download | rust-be7738bfa18989438e3597847cd6a7f3bbbfac12.tar.gz rust-be7738bfa18989438e3597847cd6a7f3bbbfac12.zip | |
Add SendDeferred trait and use it to fix #8214.
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm.rs | 30 | ||||
| -rw-r--r-- | src/libstd/rt/comm.rs | 151 |
2 files changed, 157 insertions, 24 deletions
diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index acdf2cee841..a0731dc3494 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -19,6 +19,7 @@ use either::{Either, Left, Right}; use kinds::Send; use option::{Option, Some}; use unstable::sync::Exclusive; +pub use rt::comm::SendDeferred; use rtcomm = rt::comm; use rt; @@ -105,6 +106,21 @@ impl<T: Send> GenericSmartChan<T> for Chan<T> { } } +impl<T: Send> SendDeferred<T> for Chan<T> { + fn send_deferred(&self, x: T) { + match self.inner { + Left(ref chan) => chan.send(x), + Right(ref chan) => chan.send_deferred(x) + } + } + fn try_send_deferred(&self, x: T) -> bool { + match self.inner { + Left(ref chan) => chan.try_send(x), + Right(ref chan) => chan.try_send_deferred(x) + } + } +} + impl<T: Send> GenericPort<T> for Port<T> { fn recv(&self) -> T { match self.inner { @@ -250,6 +266,20 @@ impl<T: Send> ChanOne<T> { Right(p) => p.try_send(data) } } + pub fn send_deferred(self, data: T) { + let ChanOne { inner } = self; + match inner { + Left(p) => p.send(data), + Right(p) => p.send_deferred(data) + } + } + pub fn try_send_deferred(self, data: T) -> bool { + let ChanOne { inner } = self; + match inner { + Left(p) => p.try_send(data), + Right(p) => p.try_send_deferred(data) + } + } } pub fn recv_one<T: Send>(port: PortOne<T>) -> T { diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 00e1aaa2193..c19ac8aa337 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -25,6 +25,7 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; use rt::{context, SchedulerContext}; +use tuple::ImmutableTuple; /// A combined refcount / BlockedTask-as-uint pointer. /// @@ -86,12 +87,32 @@ impl<T> ChanOne<T> { } } + /// Send a message on the one-shot channel. If a receiver task is blocked + /// waiting for the message, will wake it up and reschedule to it. pub fn send(self, val: T) { self.try_send(val); } + /// As `send`, but also returns whether or not the receiver endpoint is still open. pub fn try_send(self, val: T) -> bool { + self.try_send_inner(val, true) + } + + /// Send a message without immediately rescheduling to a blocked receiver. + /// This can be useful in contexts where rescheduling is forbidden, or to + /// optimize for when the sender expects to still have useful work to do. + pub fn send_deferred(self, val: T) { + self.try_send_deferred(val); + } + + /// As `send_deferred` and `try_send` together. + pub fn try_send_deferred(self, val: T) -> bool { + self.try_send_inner(val, false) + } + // 'do_resched' configures whether the scheduler immediately switches to + // the receiving task, or leaves the sending task still running. + fn try_send_inner(self, val: T, do_resched: bool) -> bool { rtassert!(context() != SchedulerContext); let mut this = self; @@ -130,9 +151,16 @@ impl<T> ChanOne<T> { task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); - do recvr.wake().map_consume |woken_task| { - Scheduler::run_task(woken_task); - }; + if do_resched { + do recvr.wake().map_consume |woken_task| { + Scheduler::run_task(woken_task); + }; + } else { + let recvr = Cell::new(recvr); + do Local::borrow::<Scheduler, ()> |sched| { + sched.enqueue_blocked_task(recvr.take()); + } + } } } } @@ -152,6 +180,7 @@ impl<T> PortOne<T> { } } + /// Wait for a message on the one-shot port. Fails if the send end is closed. pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, @@ -161,6 +190,7 @@ impl<T> PortOne<T> { } } + /// As `recv`, but returns `None` if the send end is closed rather than failing. pub fn try_recv(self) -> Option<T> { let mut this = self; @@ -382,6 +412,12 @@ impl<T> Drop for PortOne<T> { } } +/// Trait for non-rescheduling send operations, similar to `send_deferred` on ChanOne. +pub trait SendDeferred<T> { + fn send_deferred(&self, val: T); + fn try_send_deferred(&self, val: T) -> bool; +} + struct StreamPayload<T> { val: T, next: PortOne<StreamPayload<T>> @@ -409,6 +445,15 @@ pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { return (port, chan); } +impl<T: Send> Chan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + let (next_pone, next_cone) = oneshot(); + let cone = self.next.take(); + self.next.put_back(next_cone); + cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) + } +} + impl<T: Send> GenericChan<T> for Chan<T> { fn send(&self, val: T) { self.try_send(val); @@ -417,10 +462,16 @@ impl<T: Send> GenericChan<T> for Chan<T> { impl<T: Send> GenericSmartChan<T> for Chan<T> { fn try_send(&self, val: T) -> bool { - let (next_pone, next_cone) = oneshot(); - let cone = self.next.take(); - self.next.put_back(next_cone); - cone.try_send(StreamPayload { val: val, next: next_pone }) + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for Chan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -495,6 +546,17 @@ impl<T> SharedChan<T> { } } +impl<T: Send> SharedChan<T> { + fn try_send_inner(&self, val: T, do_resched: bool) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, + do_resched) + } + } +} + impl<T: Send> GenericChan<T> for SharedChan<T> { fn send(&self, val: T) { self.try_send(val); @@ -503,11 +565,16 @@ impl<T: Send> GenericChan<T> for SharedChan<T> { impl<T: Send> GenericSmartChan<T> for SharedChan<T> { fn try_send(&self, val: T) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) - } + self.try_send_inner(val, true) + } +} + +impl<T: Send> SendDeferred<T> for SharedChan<T> { + fn send_deferred(&self, val: T) { + self.try_send_deferred(val); + } + fn try_send_deferred(&self, val: T) -> bool { + self.try_send_inner(val, false) } } @@ -584,31 +651,32 @@ pub fn megapipe<T: Send>() -> MegaPipe<T> { impl<T: Send> GenericChan<T> for MegaPipe<T> { fn send(&self, val: T) { - match *self { - (_, ref c) => c.send(val) - } + self.second_ref().send(val) } } impl<T: Send> GenericSmartChan<T> for MegaPipe<T> { fn try_send(&self, val: T) -> bool { - match *self { - (_, ref c) => c.try_send(val) - } + self.second_ref().try_send(val) } } impl<T: Send> GenericPort<T> for MegaPipe<T> { fn recv(&self) -> T { - match *self { - (ref p, _) => p.recv() - } + self.first_ref().recv() } fn try_recv(&self) -> Option<T> { - match *self { - (ref p, _) => p.try_recv() - } + self.first_ref().try_recv() + } +} + +impl<T: Send> SendDeferred<T> for MegaPipe<T> { + fn send_deferred(&self, val: T) { + self.second_ref().send_deferred(val) + } + fn try_send_deferred(&self, val: T) -> bool { + self.second_ref().try_send_deferred(val) } } @@ -1017,4 +1085,39 @@ mod test { } } } + + #[test] + fn send_deferred() { + use unstable::sync::atomically; + + // Tests no-rescheduling of send_deferred on all types of channels. + do run_in_newsched_task { + let (pone, cone) = oneshot(); + let (pstream, cstream) = stream(); + let (pshared, cshared) = stream(); + let cshared = SharedChan::new(cshared); + let mp = megapipe(); + + let pone = Cell::new(pone); + do spawntask { pone.take().recv(); } + let pstream = Cell::new(pstream); + do spawntask { pstream.take().recv(); } + let pshared = Cell::new(pshared); + do spawntask { pshared.take().recv(); } + let p_mp = Cell::new(mp.clone()); + do spawntask { p_mp.take().recv(); } + + let cs = Cell::new((cone, cstream, cshared, mp)); + unsafe { + do atomically { + let (cone, cstream, cshared, mp) = cs.take(); + cone.send_deferred(()); + cstream.send_deferred(()); + cshared.send_deferred(()); + mp.send_deferred(()); + } + } + } + } + } |
