about summary refs log tree commit diff
diff options
context:
space:
mode:
authorjoboet <jonasboettiger@icloud.com>2023-03-14 16:42:34 +0100
committerjoboet <jonasboettiger@icloud.com>2023-03-14 16:42:34 +0100
commit34aa87292c5cd45c88a72235dad6e973a9f2b62f (patch)
tree2250ebd09125a84eae1c8e08c7d597ffc693a64b
parent4e9e465bd4cbdfe3946ea6f0ff4786f2f495a020 (diff)
downloadrust-34aa87292c5cd45c88a72235dad6e973a9f2b62f.tar.gz
rust-34aa87292c5cd45c88a72235dad6e973a9f2b62f.zip
std: leak remaining messages in bounded channel if message destructor panics
-rw-r--r--library/std/src/sync/mpmc/array.rs108
1 files changed, 42 insertions, 66 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
index fb893695a9a..492e21d9bdb 100644
--- a/library/std/src/sync/mpmc/array.rs
+++ b/library/std/src/sync/mpmc/array.rs
@@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded};
 use super::waker::SyncWaker;
 
 use crate::cell::UnsafeCell;
-use crate::mem::{self, MaybeUninit};
+use crate::mem::MaybeUninit;
 use crate::ptr;
 use crate::sync::atomic::{self, AtomicUsize, Ordering};
 use crate::time::Instant;
@@ -479,6 +479,10 @@ impl<T> Channel<T> {
     ///
     /// `tail` should be the current (and therefore last) value of `tail`.
     ///
+    /// # Panicking
+    /// If a destructor panics, the remaining messages are leaked, matching the
+    /// behaviour of the unbounded channel.
+    ///
     /// # Safety
     /// This method must only be called when dropping the last receiver. The
     /// destruction of all other receivers must have been observed with acquire
@@ -486,75 +490,47 @@ impl<T> Channel<T> {
     unsafe fn discard_all_messages(&self, tail: usize) {
         debug_assert!(self.is_disconnected());
 
-        /// Use a helper struct with a custom `Drop` to ensure all messages are
-        /// dropped, even if a destructor panicks.
-        struct DiscardState<'a, T> {
-            channel: &'a Channel<T>,
-            head: usize,
-            tail: usize,
-            backoff: Backoff,
-        }
+        // Only receivers modify `head`, so since we are the last one,
+        // this value will not change and will not be observed (since
+        // no new messages can be sent after disconnection).
+        let mut head = self.head.load(Ordering::Relaxed);
+        let tail = tail & !self.mark_bit;
 
-        impl<'a, T> DiscardState<'a, T> {
-            fn discard(&mut self) {
-                loop {
-                    // Deconstruct the head.
-                    let index = self.head & (self.channel.mark_bit - 1);
-                    let lap = self.head & !(self.channel.one_lap - 1);
-
-                    // Inspect the corresponding slot.
-                    debug_assert!(index < self.channel.buffer.len());
-                    let slot = unsafe { self.channel.buffer.get_unchecked(index) };
-                    let stamp = slot.stamp.load(Ordering::Acquire);
-
-                    // If the stamp is ahead of the head by 1, we may drop the message.
-                    if self.head + 1 == stamp {
-                        self.head = if index + 1 < self.channel.cap {
-                            // Same lap, incremented index.
-                            // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
-                            self.head + 1
-                        } else {
-                            // One lap forward, index wraps around to zero.
-                            // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
-                            lap.wrapping_add(self.channel.one_lap)
-                        };
-
-                        // We updated the head, so even if this descrutor panics,
-                        // we will not attempt to destroy the slot again.
-                        unsafe {
-                            (*slot.msg.get()).assume_init_drop();
-                        }
-                    // If the tail equals the head, that means the channel is empty.
-                    } else if self.tail == self.head {
-                        return;
-                    // Otherwise, a sender is about to write into the slot, so we need
-                    // to wait for it to update the stamp.
-                    } else {
-                        self.backoff.spin_heavy();
-                    }
-                }
-            }
-        }
+        let backoff = Backoff::new();
+        loop {
+            // Deconstruct the head.
+            let index = head & (self.mark_bit - 1);
+            let lap = head & !(self.one_lap - 1);
 
-        impl<'a, T> Drop for DiscardState<'a, T> {
-            fn drop(&mut self) {
-                self.discard();
+            // Inspect the corresponding slot.
+            debug_assert!(index < self.buffer.len());
+            let slot = unsafe { self.buffer.get_unchecked(index) };
+            let stamp = slot.stamp.load(Ordering::Acquire);
+
+            // If the stamp is ahead of the head by 1, we may drop the message.
+            if head + 1 == stamp {
+                head = if index + 1 < self.cap {
+                    // Same lap, incremented index.
+                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+                    head + 1
+                } else {
+                    // One lap forward, index wraps around to zero.
+                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+                    lap.wrapping_add(self.one_lap)
+                };
+
+                unsafe {
+                    (*slot.msg.get()).assume_init_drop();
+                }
+            // If the tail equals the head, that means the channel is empty.
+            } else if tail == head {
+                return;
+            // Otherwise, a sender is about to write into the slot, so we need
+            // to wait for it to update the stamp.
+            } else {
+                backoff.spin_heavy();
             }
         }
-
-        let mut state = DiscardState {
-            channel: self,
-            // Only receivers modify `head`, so since we are the last one,
-            // this value will not change and will not be observed (since
-            // no new messages can be sent after disconnection).
-            head: self.head.load(Ordering::Relaxed),
-            tail: tail & !self.mark_bit,
-            backoff: Backoff::new(),
-        };
-        state.discard();
-        // This point is only reached if no destructor panics, so all messages
-        // have already been dropped.
-        mem::forget(state);
     }
 
     /// Returns `true` if the channel is disconnected.