diff options
| author | joboet <jonasboettiger@icloud.com> | 2023-03-14 16:42:34 +0100 |
|---|---|---|
| committer | joboet <jonasboettiger@icloud.com> | 2023-03-14 16:42:34 +0100 |
| commit | 34aa87292c5cd45c88a72235dad6e973a9f2b62f (patch) | |
| tree | 2250ebd09125a84eae1c8e08c7d597ffc693a64b | |
| parent | 4e9e465bd4cbdfe3946ea6f0ff4786f2f495a020 (diff) | |
| download | rust-34aa87292c5cd45c88a72235dad6e973a9f2b62f.tar.gz rust-34aa87292c5cd45c88a72235dad6e973a9f2b62f.zip | |
std: leak remaining messages in bounded channel if message destructor panics
| -rw-r--r-- | library/std/src/sync/mpmc/array.rs | 108 |
1 files changed, 42 insertions, 66 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs index fb893695a9a..492e21d9bdb 100644 --- a/library/std/src/sync/mpmc/array.rs +++ b/library/std/src/sync/mpmc/array.rs @@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded}; use super::waker::SyncWaker; use crate::cell::UnsafeCell; -use crate::mem::{self, MaybeUninit}; +use crate::mem::MaybeUninit; use crate::ptr; use crate::sync::atomic::{self, AtomicUsize, Ordering}; use crate::time::Instant; @@ -479,6 +479,10 @@ impl<T> Channel<T> { /// /// `tail` should be the current (and therefore last) value of `tail`. /// + /// # Panicking + /// If a destructor panics, the remaining messages are leaked, matching the + /// behaviour of the unbounded channel. + /// /// # Safety /// This method must only be called when dropping the last receiver. The /// destruction of all other receivers must have been observed with acquire @@ -486,75 +490,47 @@ impl<T> Channel<T> { unsafe fn discard_all_messages(&self, tail: usize) { debug_assert!(self.is_disconnected()); - /// Use a helper struct with a custom `Drop` to ensure all messages are - /// dropped, even if a destructor panicks. - struct DiscardState<'a, T> { - channel: &'a Channel<T>, - head: usize, - tail: usize, - backoff: Backoff, - } + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + let mut head = self.head.load(Ordering::Relaxed); + let tail = tail & !self.mark_bit; - impl<'a, T> DiscardState<'a, T> { - fn discard(&mut self) { - loop { - // Deconstruct the head. - let index = self.head & (self.channel.mark_bit - 1); - let lap = self.head & !(self.channel.one_lap - 1); - - // Inspect the corresponding slot. - debug_assert!(index < self.channel.buffer.len()); - let slot = unsafe { self.channel.buffer.get_unchecked(index) }; - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the stamp is ahead of the head by 1, we may drop the message. - if self.head + 1 == stamp { - self.head = if index + 1 < self.channel.cap { - // Same lap, incremented index. - // Set to `{ lap: lap, mark: 0, index: index + 1 }`. - self.head + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. - lap.wrapping_add(self.channel.one_lap) - }; - - // We updated the head, so even if this descrutor panics, - // we will not attempt to destroy the slot again. - unsafe { - (*slot.msg.get()).assume_init_drop(); - } - // If the tail equals the head, that means the channel is empty. - } else if self.tail == self.head { - return; - // Otherwise, a sender is about to write into the slot, so we need - // to wait for it to update the stamp. - } else { - self.backoff.spin_heavy(); - } - } - } - } + let backoff = Backoff::new(); + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); - impl<'a, T> Drop for DiscardState<'a, T> { - fn drop(&mut self) { - self.discard(); + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if head + 1 == stamp { + head = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if tail == head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + backoff.spin_heavy(); } } - - let mut state = DiscardState { - channel: self, - // Only receivers modify `head`, so since we are the last one, - // this value will not change and will not be observed (since - // no new messages can be sent after disconnection). - head: self.head.load(Ordering::Relaxed), - tail: tail & !self.mark_bit, - backoff: Backoff::new(), - }; - state.discard(); - // This point is only reached if no destructor panics, so all messages - // have already been dropped. - mem::forget(state); } /// Returns `true` if the channel is disconnected. |
