/// Oneshot channels/ports /// /// This is the initial flavor of channels/ports used for comm module. This is /// an optimization for the one-use case of a channel. The major optimization of /// this type is to have one and exactly one allocation when the chan/port pair /// is created. /// /// Another possible optimization would be to not use an Arc box because /// in theory we know when the shared packet can be deallocated (no real need /// for the atomic reference counting), but I was having trouble how to destroy /// the data early in a drop of a Port. /// /// # Implementation /// /// Oneshots are implemented around one atomic usize variable. This variable /// indicates both the state of the port/chan but also contains any threads /// blocked on the port. All atomic operations happen on this one word. /// /// In order to upgrade a oneshot channel, an upgrade is considered a disconnect /// on behalf of the channel side of things (it can be mentally thought of as /// consuming the port). This upgrade is then also stored in the shared packet. /// The one caveat to consider is that when a port sees a disconnected channel /// it must check for data because there is no "data plus upgrade" state. pub use self::Failure::*; pub use self::UpgradeResult::*; pub use self::SelectionResult::*; use self::MyUpgrade::*; use crate::sync::mpsc::Receiver; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::cell::UnsafeCell; use crate::ptr; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::time::Instant; // Various states you can find a port in. const EMPTY: usize = 0; // initial state: no data, no blocked receiver const DATA: usize = 1; // data ready for receiver to take const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded // Any other value represents a pointer to a SignalToken value. The // protocol ensures that when the state moves *to* a pointer, // ownership of the token is given to the packet, and when the state // moves *from* a pointer, ownership of the token is transferred to // whoever changed the state. pub struct Packet { // Internal state of the chan/port pair (stores the blocked thread as well) state: AtomicUsize, // One-shot data slot location data: UnsafeCell>, // when used for the second time, a oneshot channel must be upgraded, and // this contains the slot for the upgrade upgrade: UnsafeCell>, } pub enum Failure { Empty, Disconnected, Upgraded(Receiver), } pub enum UpgradeResult { UpSuccess, UpDisconnected, UpWoke(SignalToken), } pub enum SelectionResult { SelCanceled, SelUpgraded(SignalToken, Receiver), SelSuccess, } enum MyUpgrade { NothingSent, SendUsed, GoUp(Receiver), } impl Packet { pub fn new() -> Packet { Packet { data: UnsafeCell::new(None), upgrade: UnsafeCell::new(NothingSent), state: AtomicUsize::new(EMPTY), } } pub fn send(&self, t: T) -> Result<(), T> { unsafe { // Sanity check match *self.upgrade.get() { NothingSent => {} _ => panic!("sending on a oneshot that's already sent on "), } assert!((*self.data.get()).is_none()); ptr::write(self.data.get(), Some(t)); ptr::write(self.upgrade.get(), SendUsed); match self.state.swap(DATA, Ordering::SeqCst) { // Sent the data, no one was waiting EMPTY => Ok(()), // Couldn't send the data, the port hung up first. Return the data // back up the stack. DISCONNECTED => { self.state.swap(DISCONNECTED, Ordering::SeqCst); ptr::write(self.upgrade.get(), NothingSent); Err((&mut *self.data.get()).take().unwrap()) } // Not possible, these are one-use channels DATA => unreachable!(), // There is a thread waiting on the other end. We leave the 'DATA' // state inside so it'll pick it up on the other end. ptr => { SignalToken::cast_from_usize(ptr).signal(); Ok(()) } } } } // Just tests whether this channel has been sent on or not, this is only // safe to use from the sender. pub fn sent(&self) -> bool { unsafe { match *self.upgrade.get() { NothingSent => false, _ => true, } } } pub fn recv(&self, deadline: Option) -> Result> { // Attempt to not block the thread (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. if self.state.load(Ordering::SeqCst) == EMPTY { let (wait_token, signal_token) = blocking::tokens(); let ptr = unsafe { signal_token.cast_to_usize() }; // race with senders to enter the blocking state if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY { if let Some(deadline) = deadline { let timed_out = !wait_token.wait_max_until(deadline); // Try to reset the state if timed_out { self.abort_selection().map_err(Upgraded)?; } } else { wait_token.wait(); debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY); } } else { // drop the signal token, since we never blocked drop(unsafe { SignalToken::cast_from_usize(ptr) }); } } self.try_recv() } pub fn try_recv(&self) -> Result> { unsafe { match self.state.load(Ordering::SeqCst) { EMPTY => Err(Empty), // We saw some data on the channel, but the channel can be used // again to send us an upgrade. As a result, we need to re-insert // into the channel that there's no data available (otherwise we'll // just see DATA next time). This is done as a cmpxchg because if // the state changes under our feet we'd rather just see that state // change. DATA => { self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst); match (&mut *self.data.get()).take() { Some(data) => Ok(data), None => unreachable!(), } } // There's no guarantee that we receive before an upgrade happens, // and an upgrade flags the channel as disconnected, so when we see // this we first need to check if there's data available and *then* // we go through and process the upgrade. DISCONNECTED => { match (&mut *self.data.get()).take() { Some(data) => Ok(data), None => { match ptr::replace(self.upgrade.get(), SendUsed) { SendUsed | NothingSent => Err(Disconnected), GoUp(upgrade) => Err(Upgraded(upgrade)) } } } } // We are the sole receiver; there cannot be a blocking // receiver already. _ => unreachable!() } } } // Returns whether the upgrade was completed. If the upgrade wasn't // completed, then the port couldn't get sent to the other half (it will // never receive it). pub fn upgrade(&self, up: Receiver) -> UpgradeResult { unsafe { let prev = match *self.upgrade.get() { NothingSent => NothingSent, SendUsed => SendUsed, _ => panic!("upgrading again"), }; ptr::write(self.upgrade.get(), GoUp(up)); match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // If the channel is empty or has data on it, then we're good to go. // Senders will check the data before the upgrade (in case we // plastered over the DATA state). DATA | EMPTY => UpSuccess, // If the other end is already disconnected, then we failed the // upgrade. Be sure to trash the port we were given. DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected } // If someone's waiting, we gotta wake them up ptr => UpWoke(SignalToken::cast_from_usize(ptr)) } } } pub fn drop_chan(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { DATA | DISCONNECTED | EMPTY => {} // If someone's waiting, we gotta wake them up ptr => unsafe { SignalToken::cast_from_usize(ptr).signal(); } } } pub fn drop_port(&self) { match self.state.swap(DISCONNECTED, Ordering::SeqCst) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop // glue DISCONNECTED | EMPTY => {} // There's data on the channel, so make sure we destroy it promptly. // This is why not using an arc is a little difficult (need the box // to stay valid while we take the data). DATA => unsafe { (&mut *self.data.get()).take().unwrap(); }, // We're the only ones that can block on this port _ => unreachable!() } } //////////////////////////////////////////////////////////////////////////// // select implementation //////////////////////////////////////////////////////////////////////////// // If Ok, the value is whether this port has data, if Err, then the upgraded // port needs to be checked instead of this one. pub fn can_recv(&self) -> Result> { unsafe { match self.state.load(Ordering::SeqCst) { EMPTY => Ok(false), // Welp, we tried DATA => Ok(true), // we have some un-acquired data DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data DISCONNECTED => { match ptr::replace(self.upgrade.get(), SendUsed) { // The other end sent us an upgrade, so we need to // propagate upwards whether the upgrade can receive // data GoUp(upgrade) => Err(upgrade), // If the other end disconnected without sending an // upgrade, then we have data to receive (the channel is // disconnected). up => { ptr::write(self.upgrade.get(), up); Ok(true) } } } _ => unreachable!(), // we're the "one blocker" } } } // Attempts to start selection on this port. This can either succeed, fail // because there is data, or fail because there is an upgrade pending. pub fn start_selection(&self, token: SignalToken) -> SelectionResult { unsafe { let ptr = token.cast_to_usize(); match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) { EMPTY => SelSuccess, DATA => { drop(SignalToken::cast_from_usize(ptr)); SelCanceled } DISCONNECTED if (*self.data.get()).is_some() => { drop(SignalToken::cast_from_usize(ptr)); SelCanceled } DISCONNECTED => { match ptr::replace(self.upgrade.get(), SendUsed) { // The other end sent us an upgrade, so we need to // propagate upwards whether the upgrade can receive // data GoUp(upgrade) => { SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade) } // If the other end disconnected without sending an // upgrade, then we have data to receive (the channel is // disconnected). up => { ptr::write(self.upgrade.get(), up); drop(SignalToken::cast_from_usize(ptr)); SelCanceled } } } _ => unreachable!(), // we're the "one blocker" } } } // Remove a previous selecting thread from this port. This ensures that the // blocked thread will no longer be visible to any other threads. // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> Result> { let state = match self.state.load(Ordering::SeqCst) { // Each of these states means that no further activity will happen // with regard to abortion selection s @ EMPTY | s @ DATA | s @ DISCONNECTED => s, // If we've got a blocked thread, then use an atomic to gain ownership // of it (may fail) ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst) }; // Now that we've got ownership of our state, figure out what to do // about it. match state { EMPTY => unreachable!(), // our thread used for select was stolen DATA => Ok(true), // If the other end has hung up, then we have complete ownership // of the port. First, check if there was data waiting for us. This // is possible if the other end sent something and then hung up. // // We then need to check to see if there was an upgrade requested, // and if so, the upgraded port needs to have its selection aborted. DISCONNECTED => unsafe { if (*self.data.get()).is_some() { Ok(true) } else { match ptr::replace(self.upgrade.get(), SendUsed) { GoUp(port) => Err(port), _ => Ok(true), } } }, // We woke ourselves up from select. ptr => unsafe { drop(SignalToken::cast_from_usize(ptr)); Ok(false) } } } } impl Drop for Packet { fn drop(&mut self) { assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED); } }