about summary refs log tree commit diff
path: root/library/std/src/sync
diff options
context:
space:
mode:
Diffstat (limited to 'library/std/src/sync')
-rw-r--r--library/std/src/sync/mod.rs1
-rw-r--r--library/std/src/sync/mpmc/array.rs513
-rw-r--r--library/std/src/sync/mpmc/context.rs155
-rw-r--r--library/std/src/sync/mpmc/counter.rs137
-rw-r--r--library/std/src/sync/mpmc/error.rs46
-rw-r--r--library/std/src/sync/mpmc/list.rs638
-rw-r--r--library/std/src/sync/mpmc/mod.rs430
-rw-r--r--library/std/src/sync/mpmc/select.rs71
-rw-r--r--library/std/src/sync/mpmc/utils.rs144
-rw-r--r--library/std/src/sync/mpmc/waker.rs204
-rw-r--r--library/std/src/sync/mpmc/zero.rs318
-rw-r--r--library/std/src/sync/mpsc/blocking.rs82
-rw-r--r--library/std/src/sync/mpsc/cache_aligned.rs25
-rw-r--r--library/std/src/sync/mpsc/mod.rs458
-rw-r--r--library/std/src/sync/mpsc/mpsc_queue.rs124
-rw-r--r--library/std/src/sync/mpsc/mpsc_queue/tests.rs47
-rw-r--r--library/std/src/sync/mpsc/oneshot.rs315
-rw-r--r--library/std/src/sync/mpsc/shared.rs501
-rw-r--r--library/std/src/sync/mpsc/spsc_queue.rs244
-rw-r--r--library/std/src/sync/mpsc/spsc_queue/tests.rs102
-rw-r--r--library/std/src/sync/mpsc/stream.rs457
-rw-r--r--library/std/src/sync/mpsc/sync.rs495
-rw-r--r--library/std/src/sync/mpsc/tests.rs14
23 files changed, 2695 insertions, 2826 deletions
diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs
index 7b507a169b3..4fee8d3e92f 100644
--- a/library/std/src/sync/mod.rs
+++ b/library/std/src/sync/mod.rs
@@ -182,6 +182,7 @@ pub mod mpsc;
 mod barrier;
 mod condvar;
 mod lazy_lock;
+mod mpmc;
 mod mutex;
 mod once;
 mod once_lock;
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
new file mode 100644
index 00000000000..4db7b4990b9
--- /dev/null
+++ b/library/std/src/sync/mpmc/array.rs
@@ -0,0 +1,513 @@
+//! Bounded channel based on a preallocated array.
+//!
+//! This flavor has a fixed, positive capacity.
+//!
+//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
+//!
+//! Source:
+//!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
+//!   - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+/// A slot in a channel.
+struct Slot<T> {
+    /// The current stamp.
+    stamp: AtomicUsize,
+
+    /// The message in this slot.
+    msg: UnsafeCell<MaybeUninit<T>>,
+}
+
+/// The token type for the array flavor.
+#[derive(Debug)]
+pub(crate) struct ArrayToken {
+    /// Slot to read from or write to.
+    slot: *const u8,
+
+    /// Stamp to store into the slot after reading or writing.
+    stamp: usize,
+}
+
+impl Default for ArrayToken {
+    #[inline]
+    fn default() -> Self {
+        ArrayToken { slot: ptr::null(), stamp: 0 }
+    }
+}
+
+/// Bounded channel based on a preallocated array.
+pub(crate) struct Channel<T> {
+    /// The head of the channel.
+    ///
+    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+    /// represent the lap. The mark bit in the head is always zero.
+    ///
+    /// Messages are popped from the head of the channel.
+    head: CachePadded<AtomicUsize>,
+
+    /// The tail of the channel.
+    ///
+    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+    /// represent the lap. The mark bit indicates that the channel is disconnected.
+    ///
+    /// Messages are pushed into the tail of the channel.
+    tail: CachePadded<AtomicUsize>,
+
+    /// The buffer holding slots.
+    buffer: Box<[Slot<T>]>,
+
+    /// The channel capacity.
+    cap: usize,
+
+    /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
+    one_lap: usize,
+
+    /// If this bit is set in the tail, that means the channel is disconnected.
+    mark_bit: usize,
+
+    /// Senders waiting while the channel is full.
+    senders: SyncWaker,
+
+    /// Receivers waiting while the channel is empty and not disconnected.
+    receivers: SyncWaker,
+}
+
+impl<T> Channel<T> {
+    /// Creates a bounded channel of capacity `cap`.
+    pub(crate) fn with_capacity(cap: usize) -> Self {
+        assert!(cap > 0, "capacity must be positive");
+
+        // Compute constants `mark_bit` and `one_lap`.
+        let mark_bit = (cap + 1).next_power_of_two();
+        let one_lap = mark_bit * 2;
+
+        // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+        let head = 0;
+        // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+        let tail = 0;
+
+        // Allocate a buffer of `cap` slots initialized
+        // with stamps.
+        let buffer: Box<[Slot<T>]> = (0..cap)
+            .map(|i| {
+                // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+                Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) }
+            })
+            .collect();
+
+        Channel {
+            buffer,
+            cap,
+            one_lap,
+            mark_bit,
+            head: CachePadded::new(AtomicUsize::new(head)),
+            tail: CachePadded::new(AtomicUsize::new(tail)),
+            senders: SyncWaker::new(),
+            receivers: SyncWaker::new(),
+        }
+    }
+
+    /// Attempts to reserve a slot for sending a message.
+    fn start_send(&self, token: &mut Token) -> bool {
+        let backoff = Backoff::new();
+        let mut tail = self.tail.load(Ordering::Relaxed);
+
+        loop {
+            // Check if the channel is disconnected.
+            if tail & self.mark_bit != 0 {
+                token.array.slot = ptr::null();
+                token.array.stamp = 0;
+                return true;
+            }
+
+            // Deconstruct the tail.
+            let index = tail & (self.mark_bit - 1);
+            let lap = tail & !(self.one_lap - 1);
+
+            // Inspect the corresponding slot.
+            debug_assert!(index < self.buffer.len());
+            let slot = unsafe { self.buffer.get_unchecked(index) };
+            let stamp = slot.stamp.load(Ordering::Acquire);
+
+            // If the tail and the stamp match, we may attempt to push.
+            if tail == stamp {
+                let new_tail = if index + 1 < self.cap {
+                    // Same lap, incremented index.
+                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+                    tail + 1
+                } else {
+                    // One lap forward, index wraps around to zero.
+                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+                    lap.wrapping_add(self.one_lap)
+                };
+
+                // Try moving the tail.
+                match self.tail.compare_exchange_weak(
+                    tail,
+                    new_tail,
+                    Ordering::SeqCst,
+                    Ordering::Relaxed,
+                ) {
+                    Ok(_) => {
+                        // Prepare the token for the follow-up call to `write`.
+                        token.array.slot = slot as *const Slot<T> as *const u8;
+                        token.array.stamp = tail + 1;
+                        return true;
+                    }
+                    Err(_) => {
+                        backoff.spin();
+                        tail = self.tail.load(Ordering::Relaxed);
+                    }
+                }
+            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
+                atomic::fence(Ordering::SeqCst);
+                let head = self.head.load(Ordering::Relaxed);
+
+                // If the head lags one lap behind the tail as well...
+                if head.wrapping_add(self.one_lap) == tail {
+                    // ...then the channel is full.
+                    return false;
+                }
+
+                backoff.spin();
+                tail = self.tail.load(Ordering::Relaxed);
+            } else {
+                // Snooze because we need to wait for the stamp to get updated.
+                backoff.snooze();
+                tail = self.tail.load(Ordering::Relaxed);
+            }
+        }
+    }
+
+    /// Writes a message into the channel.
+    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+        // If there is no slot, the channel is disconnected.
+        if token.array.slot.is_null() {
+            return Err(msg);
+        }
+
+        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+        // Write the message into the slot and update the stamp.
+        slot.msg.get().write(MaybeUninit::new(msg));
+        slot.stamp.store(token.array.stamp, Ordering::Release);
+
+        // Wake a sleeping receiver.
+        self.receivers.notify();
+        Ok(())
+    }
+
+    /// Attempts to reserve a slot for receiving a message.
+    fn start_recv(&self, token: &mut Token) -> bool {
+        let backoff = Backoff::new();
+        let mut head = self.head.load(Ordering::Relaxed);
+
+        loop {
+            // Deconstruct the head.
+            let index = head & (self.mark_bit - 1);
+            let lap = head & !(self.one_lap - 1);
+
+            // Inspect the corresponding slot.
+            debug_assert!(index < self.buffer.len());
+            let slot = unsafe { self.buffer.get_unchecked(index) };
+            let stamp = slot.stamp.load(Ordering::Acquire);
+
+            // If the the stamp is ahead of the head by 1, we may attempt to pop.
+            if head + 1 == stamp {
+                let new = if index + 1 < self.cap {
+                    // Same lap, incremented index.
+                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+                    head + 1
+                } else {
+                    // One lap forward, index wraps around to zero.
+                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+                    lap.wrapping_add(self.one_lap)
+                };
+
+                // Try moving the head.
+                match self.head.compare_exchange_weak(
+                    head,
+                    new,
+                    Ordering::SeqCst,
+                    Ordering::Relaxed,
+                ) {
+                    Ok(_) => {
+                        // Prepare the token for the follow-up call to `read`.
+                        token.array.slot = slot as *const Slot<T> as *const u8;
+                        token.array.stamp = head.wrapping_add(self.one_lap);
+                        return true;
+                    }
+                    Err(_) => {
+                        backoff.spin();
+                        head = self.head.load(Ordering::Relaxed);
+                    }
+                }
+            } else if stamp == head {
+                atomic::fence(Ordering::SeqCst);
+                let tail = self.tail.load(Ordering::Relaxed);
+
+                // If the tail equals the head, that means the channel is empty.
+                if (tail & !self.mark_bit) == head {
+                    // If the channel is disconnected...
+                    if tail & self.mark_bit != 0 {
+                        // ...then receive an error.
+                        token.array.slot = ptr::null();
+                        token.array.stamp = 0;
+                        return true;
+                    } else {
+                        // Otherwise, the receive operation is not ready.
+                        return false;
+                    }
+                }
+
+                backoff.spin();
+                head = self.head.load(Ordering::Relaxed);
+            } else {
+                // Snooze because we need to wait for the stamp to get updated.
+                backoff.snooze();
+                head = self.head.load(Ordering::Relaxed);
+            }
+        }
+    }
+
+    /// Reads a message from the channel.
+    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+        if token.array.slot.is_null() {
+            // The channel is disconnected.
+            return Err(());
+        }
+
+        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+        // Read the message from the slot and update the stamp.
+        let msg = slot.msg.get().read().assume_init();
+        slot.stamp.store(token.array.stamp, Ordering::Release);
+
+        // Wake a sleeping sender.
+        self.senders.notify();
+        Ok(msg)
+    }
+
+    /// Attempts to send a message into the channel.
+    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        let token = &mut Token::default();
+        if self.start_send(token) {
+            unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
+        } else {
+            Err(TrySendError::Full(msg))
+        }
+    }
+
+    /// Sends a message into the channel.
+    pub(crate) fn send(
+        &self,
+        msg: T,
+        deadline: Option<Instant>,
+    ) -> Result<(), SendTimeoutError<T>> {
+        let token = &mut Token::default();
+        loop {
+            // Try sending a message several times.
+            let backoff = Backoff::new();
+            loop {
+                if self.start_send(token) {
+                    let res = unsafe { self.write(token, msg) };
+                    return res.map_err(SendTimeoutError::Disconnected);
+                }
+
+                if backoff.is_completed() {
+                    break;
+                } else {
+                    backoff.spin();
+                }
+            }
+
+            if let Some(d) = deadline {
+                if Instant::now() >= d {
+                    return Err(SendTimeoutError::Timeout(msg));
+                }
+            }
+
+            Context::with(|cx| {
+                // Prepare for blocking until a receiver wakes us up.
+                let oper = Operation::hook(token);
+                self.senders.register(oper, cx);
+
+                // Has the channel become ready just now?
+                if !self.is_full() || self.is_disconnected() {
+                    let _ = cx.try_select(Selected::Aborted);
+                }
+
+                // Block the current thread.
+                let sel = cx.wait_until(deadline);
+
+                match sel {
+                    Selected::Waiting => unreachable!(),
+                    Selected::Aborted | Selected::Disconnected => {
+                        self.senders.unregister(oper).unwrap();
+                    }
+                    Selected::Operation(_) => {}
+                }
+            });
+        }
+    }
+
+    /// Attempts to receive a message without blocking.
+    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+        let token = &mut Token::default();
+
+        if self.start_recv(token) {
+            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+        } else {
+            Err(TryRecvError::Empty)
+        }
+    }
+
+    /// Receives a message from the channel.
+    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+        let token = &mut Token::default();
+        loop {
+            if self.start_recv(token) {
+                let res = unsafe { self.read(token) };
+                return res.map_err(|_| RecvTimeoutError::Disconnected);
+            }
+
+            if let Some(d) = deadline {
+                if Instant::now() >= d {
+                    return Err(RecvTimeoutError::Timeout);
+                }
+            }
+
+            Context::with(|cx| {
+                // Prepare for blocking until a sender wakes us up.
+                let oper = Operation::hook(token);
+                self.receivers.register(oper, cx);
+
+                // Has the channel become ready just now?
+                if !self.is_empty() || self.is_disconnected() {
+                    let _ = cx.try_select(Selected::Aborted);
+                }
+
+                // Block the current thread.
+                let sel = cx.wait_until(deadline);
+
+                match sel {
+                    Selected::Waiting => unreachable!(),
+                    Selected::Aborted | Selected::Disconnected => {
+                        self.receivers.unregister(oper).unwrap();
+                        // If the channel was disconnected, we still have to check for remaining
+                        // messages.
+                    }
+                    Selected::Operation(_) => {}
+                }
+            });
+        }
+    }
+
+    /// Returns the current number of messages inside the channel.
+    pub(crate) fn len(&self) -> usize {
+        loop {
+            // Load the tail, then load the head.
+            let tail = self.tail.load(Ordering::SeqCst);
+            let head = self.head.load(Ordering::SeqCst);
+
+            // If the tail didn't change, we've got consistent values to work with.
+            if self.tail.load(Ordering::SeqCst) == tail {
+                let hix = head & (self.mark_bit - 1);
+                let tix = tail & (self.mark_bit - 1);
+
+                return if hix < tix {
+                    tix - hix
+                } else if hix > tix {
+                    self.cap - hix + tix
+                } else if (tail & !self.mark_bit) == head {
+                    0
+                } else {
+                    self.cap
+                };
+            }
+        }
+    }
+
+    /// Returns the capacity of the channel.
+    #[allow(clippy::unnecessary_wraps)] // This is intentional.
+    pub(crate) fn capacity(&self) -> Option<usize> {
+        Some(self.cap)
+    }
+
+    /// Disconnects the channel and wakes up all blocked senders and receivers.
+    ///
+    /// Returns `true` if this call disconnected the channel.
+    pub(crate) fn disconnect(&self) -> bool {
+        let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+
+        if tail & self.mark_bit == 0 {
+            self.senders.disconnect();
+            self.receivers.disconnect();
+            true
+        } else {
+            false
+        }
+    }
+
+    /// Returns `true` if the channel is disconnected.
+    pub(crate) fn is_disconnected(&self) -> bool {
+        self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
+    }
+
+    /// Returns `true` if the channel is empty.
+    pub(crate) fn is_empty(&self) -> bool {
+        let head = self.head.load(Ordering::SeqCst);
+        let tail = self.tail.load(Ordering::SeqCst);
+
+        // Is the tail equal to the head?
+        //
+        // Note: If the head changes just before we load the tail, that means there was a moment
+        // when the channel was not empty, so it is safe to just return `false`.
+        (tail & !self.mark_bit) == head
+    }
+
+    /// Returns `true` if the channel is full.
+    pub(crate) fn is_full(&self) -> bool {
+        let tail = self.tail.load(Ordering::SeqCst);
+        let head = self.head.load(Ordering::SeqCst);
+
+        // Is the head lagging one lap behind tail?
+        //
+        // Note: If the tail changes just before we load the head, that means there was a moment
+        // when the channel was not full, so it is safe to just return `false`.
+        head.wrapping_add(self.one_lap) == tail & !self.mark_bit
+    }
+}
+
+impl<T> Drop for Channel<T> {
+    fn drop(&mut self) {
+        // Get the index of the head.
+        let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
+
+        // Loop over all slots that hold a message and drop them.
+        for i in 0..self.len() {
+            // Compute the index of the next slot holding a message.
+            let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap };
+
+            unsafe {
+                debug_assert!(index < self.buffer.len());
+                let slot = self.buffer.get_unchecked_mut(index);
+                let msg = &mut *slot.msg.get();
+                msg.as_mut_ptr().drop_in_place();
+            }
+        }
+    }
+}
diff --git a/library/std/src/sync/mpmc/context.rs b/library/std/src/sync/mpmc/context.rs
new file mode 100644
index 00000000000..bbfc6ce00ff
--- /dev/null
+++ b/library/std/src/sync/mpmc/context.rs
@@ -0,0 +1,155 @@
+//! Thread-local channel context.
+
+use super::select::Selected;
+use super::waker::current_thread_id;
+
+use crate::cell::Cell;
+use crate::ptr;
+use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+use crate::sync::Arc;
+use crate::thread::{self, Thread};
+use crate::time::Instant;
+
+/// Thread-local context.
+#[derive(Debug, Clone)]
+pub struct Context {
+    inner: Arc<Inner>,
+}
+
+/// Inner representation of `Context`.
+#[derive(Debug)]
+struct Inner {
+    /// Selected operation.
+    select: AtomicUsize,
+
+    /// A slot into which another thread may store a pointer to its `Packet`.
+    packet: AtomicPtr<()>,
+
+    /// Thread handle.
+    thread: Thread,
+
+    /// Thread id.
+    thread_id: usize,
+}
+
+impl Context {
+    /// Creates a new context for the duration of the closure.
+    #[inline]
+    pub fn with<F, R>(f: F) -> R
+    where
+        F: FnOnce(&Context) -> R,
+    {
+        thread_local! {
+            /// Cached thread-local context.
+            static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
+        }
+
+        let mut f = Some(f);
+        let mut f = |cx: &Context| -> R {
+            let f = f.take().unwrap();
+            f(cx)
+        };
+
+        CONTEXT
+            .try_with(|cell| match cell.take() {
+                None => f(&Context::new()),
+                Some(cx) => {
+                    cx.reset();
+                    let res = f(&cx);
+                    cell.set(Some(cx));
+                    res
+                }
+            })
+            .unwrap_or_else(|_| f(&Context::new()))
+    }
+
+    /// Creates a new `Context`.
+    #[cold]
+    fn new() -> Context {
+        Context {
+            inner: Arc::new(Inner {
+                select: AtomicUsize::new(Selected::Waiting.into()),
+                packet: AtomicPtr::new(ptr::null_mut()),
+                thread: thread::current(),
+                thread_id: current_thread_id(),
+            }),
+        }
+    }
+
+    /// Resets `select` and `packet`.
+    #[inline]
+    fn reset(&self) {
+        self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
+        self.inner.packet.store(ptr::null_mut(), Ordering::Release);
+    }
+
+    /// Attempts to select an operation.
+    ///
+    /// On failure, the previously selected operation is returned.
+    #[inline]
+    pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
+        self.inner
+            .select
+            .compare_exchange(
+                Selected::Waiting.into(),
+                select.into(),
+                Ordering::AcqRel,
+                Ordering::Acquire,
+            )
+            .map(|_| ())
+            .map_err(|e| e.into())
+    }
+
+    /// Stores a packet.
+    ///
+    /// This method must be called after `try_select` succeeds and there is a packet to provide.
+    #[inline]
+    pub fn store_packet(&self, packet: *mut ()) {
+        if !packet.is_null() {
+            self.inner.packet.store(packet, Ordering::Release);
+        }
+    }
+
+    /// Waits until an operation is selected and returns it.
+    ///
+    /// If the deadline is reached, `Selected::Aborted` will be selected.
+    #[inline]
+    pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
+        loop {
+            // Check whether an operation has been selected.
+            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
+            if sel != Selected::Waiting {
+                return sel;
+            }
+
+            // If there's a deadline, park the current thread until the deadline is reached.
+            if let Some(end) = deadline {
+                let now = Instant::now();
+
+                if now < end {
+                    thread::park_timeout(end - now);
+                } else {
+                    // The deadline has been reached. Try aborting select.
+                    return match self.try_select(Selected::Aborted) {
+                        Ok(()) => Selected::Aborted,
+                        Err(s) => s,
+                    };
+                }
+            } else {
+                thread::park();
+            }
+        }
+    }
+
+    /// Unparks the thread this context belongs to.
+    #[inline]
+    pub fn unpark(&self) {
+        self.inner.thread.unpark();
+    }
+
+    /// Returns the id of the thread this context belongs to.
+    #[inline]
+    pub fn thread_id(&self) -> usize {
+        self.inner.thread_id
+    }
+}
diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs
new file mode 100644
index 00000000000..a5a6bdc67f1
--- /dev/null
+++ b/library/std/src/sync/mpmc/counter.rs
@@ -0,0 +1,137 @@
+use crate::ops;
+use crate::process;
+use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+/// Reference counter internals.
+struct Counter<C> {
+    /// The number of senders associated with the channel.
+    senders: AtomicUsize,
+
+    /// The number of receivers associated with the channel.
+    receivers: AtomicUsize,
+
+    /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
+    destroy: AtomicBool,
+
+    /// The internal channel.
+    chan: C,
+}
+
+/// Wraps a channel into the reference counter.
+pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
+    let counter = Box::into_raw(Box::new(Counter {
+        senders: AtomicUsize::new(1),
+        receivers: AtomicUsize::new(1),
+        destroy: AtomicBool::new(false),
+        chan,
+    }));
+    let s = Sender { counter };
+    let r = Receiver { counter };
+    (s, r)
+}
+
+/// The sending side.
+pub(crate) struct Sender<C> {
+    counter: *mut Counter<C>,
+}
+
+impl<C> Sender<C> {
+    /// Returns the internal `Counter`.
+    fn counter(&self) -> &Counter<C> {
+        unsafe { &*self.counter }
+    }
+
+    /// Acquires another sender reference.
+    pub(crate) fn acquire(&self) -> Sender<C> {
+        let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
+
+        // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
+        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+        // just abort when the count becomes very large.
+        if count > isize::MAX as usize {
+            process::abort();
+        }
+
+        Sender { counter: self.counter }
+    }
+
+    /// Releases the sender reference.
+    ///
+    /// Function `disconnect` will be called if this is the last sender reference.
+    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+        if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
+            disconnect(&self.counter().chan);
+
+            if self.counter().destroy.swap(true, Ordering::AcqRel) {
+                drop(Box::from_raw(self.counter));
+            }
+        }
+    }
+}
+
+impl<C> ops::Deref for Sender<C> {
+    type Target = C;
+
+    fn deref(&self) -> &C {
+        &self.counter().chan
+    }
+}
+
+impl<C> PartialEq for Sender<C> {
+    fn eq(&self, other: &Sender<C>) -> bool {
+        self.counter == other.counter
+    }
+}
+
+/// The receiving side.
+pub(crate) struct Receiver<C> {
+    counter: *mut Counter<C>,
+}
+
+impl<C> Receiver<C> {
+    /// Returns the internal `Counter`.
+    fn counter(&self) -> &Counter<C> {
+        unsafe { &*self.counter }
+    }
+
+    /// Acquires another receiver reference.
+    pub(crate) fn acquire(&self) -> Receiver<C> {
+        let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
+
+        // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
+        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+        // just abort when the count becomes very large.
+        if count > isize::MAX as usize {
+            process::abort();
+        }
+
+        Receiver { counter: self.counter }
+    }
+
+    /// Releases the receiver reference.
+    ///
+    /// Function `disconnect` will be called if this is the last receiver reference.
+    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+        if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
+            disconnect(&self.counter().chan);
+
+            if self.counter().destroy.swap(true, Ordering::AcqRel) {
+                drop(Box::from_raw(self.counter));
+            }
+        }
+    }
+}
+
+impl<C> ops::Deref for Receiver<C> {
+    type Target = C;
+
+    fn deref(&self) -> &C {
+        &self.counter().chan
+    }
+}
+
+impl<C> PartialEq for Receiver<C> {
+    fn eq(&self, other: &Receiver<C>) -> bool {
+        self.counter == other.counter
+    }
+}
diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs
new file mode 100644
index 00000000000..1b8a1f38797
--- /dev/null
+++ b/library/std/src/sync/mpmc/error.rs
@@ -0,0 +1,46 @@
+use crate::error;
+use crate::fmt;
+
+pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError};
+
+/// An error returned from the [`send_timeout`] method.
+///
+/// The error contains the message being sent so it can be recovered.
+///
+/// [`send_timeout`]: super::Sender::send_timeout
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub enum SendTimeoutError<T> {
+    /// The message could not be sent because the channel is full and the operation timed out.
+    ///
+    /// If this is a zero-capacity channel, then the error indicates that there was no receiver
+    /// available to receive the message and the operation timed out.
+    Timeout(T),
+
+    /// The message could not be sent because the channel is disconnected.
+    Disconnected(T),
+}
+
+impl<T> fmt::Debug for SendTimeoutError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        "SendTimeoutError(..)".fmt(f)
+    }
+}
+
+impl<T> fmt::Display for SendTimeoutError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match *self {
+            SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f),
+            SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f),
+        }
+    }
+}
+
+impl<T: Send> error::Error for SendTimeoutError<T> {}
+
+impl<T> From<SendError<T>> for SendTimeoutError<T> {
+    fn from(err: SendError<T>) -> SendTimeoutError<T> {
+        match err {
+            SendError(e) => SendTimeoutError::Disconnected(e),
+        }
+    }
+}
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
new file mode 100644
index 00000000000..2d5b2fb3b23
--- /dev/null
+++ b/library/std/src/sync/mpmc/list.rs
@@ -0,0 +1,638 @@
+//! Unbounded channel implemented as a linked list.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
+
+/// A slot in a block.
+struct Slot<T> {
+    /// The message.
+    msg: UnsafeCell<MaybeUninit<T>>,
+
+    /// The state of the slot.
+    state: AtomicUsize,
+}
+
+impl<T> Slot<T> {
+    /// Waits until a message is written into the slot.
+    fn wait_write(&self) {
+        let backoff = Backoff::new();
+        while self.state.load(Ordering::Acquire) & WRITE == 0 {
+            backoff.snooze();
+        }
+    }
+}
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+struct Block<T> {
+    /// The next block in the linked list.
+    next: AtomicPtr<Block<T>>,
+
+    /// Slots for messages.
+    slots: [Slot<T>; BLOCK_CAP],
+}
+
+impl<T> Block<T> {
+    /// Creates an empty block.
+    fn new() -> Block<T> {
+        // SAFETY: This is safe because:
+        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
+        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
+        //  [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
+        //       holds a MaybeUninit.
+        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
+        unsafe { MaybeUninit::zeroed().assume_init() }
+    }
+
+    /// Waits until the next pointer is set.
+    fn wait_next(&self) -> *mut Block<T> {
+        let backoff = Backoff::new();
+        loop {
+            let next = self.next.load(Ordering::Acquire);
+            if !next.is_null() {
+                return next;
+            }
+            backoff.snooze();
+        }
+    }
+
+    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+    unsafe fn destroy(this: *mut Block<T>, start: usize) {
+        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
+        // begun destruction of the block.
+        for i in start..BLOCK_CAP - 1 {
+            let slot = (*this).slots.get_unchecked(i);
+
+            // Mark the `DESTROY` bit if a thread is still using the slot.
+            if slot.state.load(Ordering::Acquire) & READ == 0
+                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+            {
+                // If a thread is still using the slot, it will continue destruction of the block.
+                return;
+            }
+        }
+
+        // No thread is using the block, now it is safe to destroy it.
+        drop(Box::from_raw(this));
+    }
+}
+
+/// A position in a channel.
+#[derive(Debug)]
+struct Position<T> {
+    /// The index in the channel.
+    index: AtomicUsize,
+
+    /// The block in the linked list.
+    block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+#[derive(Debug)]
+pub(crate) struct ListToken {
+    /// The block of slots.
+    block: *const u8,
+
+    /// The offset into the block.
+    offset: usize,
+}
+
+impl Default for ListToken {
+    #[inline]
+    fn default() -> Self {
+        ListToken { block: ptr::null(), offset: 0 }
+    }
+}
+
+/// Unbounded channel implemented as a linked list.
+///
+/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
+/// represented as numbers of type `usize` and wrap on overflow.
+///
+/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
+/// improve cache efficiency.
+pub(crate) struct Channel<T> {
+    /// The head of the channel.
+    head: CachePadded<Position<T>>,
+
+    /// The tail of the channel.
+    tail: CachePadded<Position<T>>,
+
+    /// Receivers waiting while the channel is empty and not disconnected.
+    receivers: SyncWaker,
+
+    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
+    _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+    /// Creates a new unbounded channel.
+    pub(crate) fn new() -> Self {
+        Channel {
+            head: CachePadded::new(Position {
+                block: AtomicPtr::new(ptr::null_mut()),
+                index: AtomicUsize::new(0),
+            }),
+            tail: CachePadded::new(Position {
+                block: AtomicPtr::new(ptr::null_mut()),
+                index: AtomicUsize::new(0),
+            }),
+            receivers: SyncWaker::new(),
+            _marker: PhantomData,
+        }
+    }
+
+    /// Attempts to reserve a slot for sending a message.
+    fn start_send(&self, token: &mut Token) -> bool {
+        let backoff = Backoff::new();
+        let mut tail = self.tail.index.load(Ordering::Acquire);
+        let mut block = self.tail.block.load(Ordering::Acquire);
+        let mut next_block = None;
+
+        loop {
+            // Check if the channel is disconnected.
+            if tail & MARK_BIT != 0 {
+                token.list.block = ptr::null();
+                return true;
+            }
+
+            // Calculate the offset of the index into the block.
+            let offset = (tail >> SHIFT) % LAP;
+
+            // If we reached the end of the block, wait until the next one is installed.
+            if offset == BLOCK_CAP {
+                backoff.snooze();
+                tail = self.tail.index.load(Ordering::Acquire);
+                block = self.tail.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            // If we're going to have to install the next block, allocate it in advance in order to
+            // make the wait for other threads as short as possible.
+            if offset + 1 == BLOCK_CAP && next_block.is_none() {
+                next_block = Some(Box::new(Block::<T>::new()));
+            }
+
+            // If this is the first message to be sent into the channel, we need to allocate the
+            // first block and install it.
+            if block.is_null() {
+                let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+                if self
+                    .tail
+                    .block
+                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+                    .is_ok()
+                {
+                    self.head.block.store(new, Ordering::Release);
+                    block = new;
+                } else {
+                    next_block = unsafe { Some(Box::from_raw(new)) };
+                    tail = self.tail.index.load(Ordering::Acquire);
+                    block = self.tail.block.load(Ordering::Acquire);
+                    continue;
+                }
+            }
+
+            let new_tail = tail + (1 << SHIFT);
+
+            // Try advancing the tail forward.
+            match self.tail.index.compare_exchange_weak(
+                tail,
+                new_tail,
+                Ordering::SeqCst,
+                Ordering::Acquire,
+            ) {
+                Ok(_) => unsafe {
+                    // If we've reached the end of the block, install the next one.
+                    if offset + 1 == BLOCK_CAP {
+                        let next_block = Box::into_raw(next_block.unwrap());
+                        self.tail.block.store(next_block, Ordering::Release);
+                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+                        (*block).next.store(next_block, Ordering::Release);
+                    }
+
+                    token.list.block = block as *const u8;
+                    token.list.offset = offset;
+                    return true;
+                },
+                Err(_) => {
+                    backoff.spin();
+                    tail = self.tail.index.load(Ordering::Acquire);
+                    block = self.tail.block.load(Ordering::Acquire);
+                }
+            }
+        }
+    }
+
+    /// Writes a message into the channel.
+    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+        // If there is no slot, the channel is disconnected.
+        if token.list.block.is_null() {
+            return Err(msg);
+        }
+
+        // Write the message into the slot.
+        let block = token.list.block as *mut Block<T>;
+        let offset = token.list.offset;
+        let slot = (*block).slots.get_unchecked(offset);
+        slot.msg.get().write(MaybeUninit::new(msg));
+        slot.state.fetch_or(WRITE, Ordering::Release);
+
+        // Wake a sleeping receiver.
+        self.receivers.notify();
+        Ok(())
+    }
+
+    /// Attempts to reserve a slot for receiving a message.
+    fn start_recv(&self, token: &mut Token) -> bool {
+        let backoff = Backoff::new();
+        let mut head = self.head.index.load(Ordering::Acquire);
+        let mut block = self.head.block.load(Ordering::Acquire);
+
+        loop {
+            // Calculate the offset of the index into the block.
+            let offset = (head >> SHIFT) % LAP;
+
+            // If we reached the end of the block, wait until the next one is installed.
+            if offset == BLOCK_CAP {
+                backoff.snooze();
+                head = self.head.index.load(Ordering::Acquire);
+                block = self.head.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            let mut new_head = head + (1 << SHIFT);
+
+            if new_head & MARK_BIT == 0 {
+                atomic::fence(Ordering::SeqCst);
+                let tail = self.tail.index.load(Ordering::Relaxed);
+
+                // If the tail equals the head, that means the channel is empty.
+                if head >> SHIFT == tail >> SHIFT {
+                    // If the channel is disconnected...
+                    if tail & MARK_BIT != 0 {
+                        // ...then receive an error.
+                        token.list.block = ptr::null();
+                        return true;
+                    } else {
+                        // Otherwise, the receive operation is not ready.
+                        return false;
+                    }
+                }
+
+                // If head and tail are not in the same block, set `MARK_BIT` in head.
+                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+                    new_head |= MARK_BIT;
+                }
+            }
+
+            // The block can be null here only if the first message is being sent into the channel.
+            // In that case, just wait until it gets initialized.
+            if block.is_null() {
+                backoff.snooze();
+                head = self.head.index.load(Ordering::Acquire);
+                block = self.head.block.load(Ordering::Acquire);
+                continue;
+            }
+
+            // Try moving the head index forward.
+            match self.head.index.compare_exchange_weak(
+                head,
+                new_head,
+                Ordering::SeqCst,
+                Ordering::Acquire,
+            ) {
+                Ok(_) => unsafe {
+                    // If we've reached the end of the block, move to the next one.
+                    if offset + 1 == BLOCK_CAP {
+                        let next = (*block).wait_next();
+                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+                        if !(*next).next.load(Ordering::Relaxed).is_null() {
+                            next_index |= MARK_BIT;
+                        }
+
+                        self.head.block.store(next, Ordering::Release);
+                        self.head.index.store(next_index, Ordering::Release);
+                    }
+
+                    token.list.block = block as *const u8;
+                    token.list.offset = offset;
+                    return true;
+                },
+                Err(_) => {
+                    backoff.spin();
+                    head = self.head.index.load(Ordering::Acquire);
+                    block = self.head.block.load(Ordering::Acquire);
+                }
+            }
+        }
+    }
+
+    /// Reads a message from the channel.
+    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+        if token.list.block.is_null() {
+            // The channel is disconnected.
+            return Err(());
+        }
+
+        // Read the message.
+        let block = token.list.block as *mut Block<T>;
+        let offset = token.list.offset;
+        let slot = (*block).slots.get_unchecked(offset);
+        slot.wait_write();
+        let msg = slot.msg.get().read().assume_init();
+
+        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+        // couldn't because we were busy reading from the slot.
+        if offset + 1 == BLOCK_CAP {
+            Block::destroy(block, 0);
+        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+            Block::destroy(block, offset + 1);
+        }
+
+        Ok(msg)
+    }
+
+    /// Attempts to send a message into the channel.
+    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        self.send(msg, None).map_err(|err| match err {
+            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+            SendTimeoutError::Timeout(_) => unreachable!(),
+        })
+    }
+
+    /// Sends a message into the channel.
+    pub(crate) fn send(
+        &self,
+        msg: T,
+        _deadline: Option<Instant>,
+    ) -> Result<(), SendTimeoutError<T>> {
+        let token = &mut Token::default();
+        assert!(self.start_send(token));
+        unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
+    }
+
+    /// Attempts to receive a message without blocking.
+    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+        let token = &mut Token::default();
+
+        if self.start_recv(token) {
+            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+        } else {
+            Err(TryRecvError::Empty)
+        }
+    }
+
+    /// Receives a message from the channel.
+    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+        let token = &mut Token::default();
+        loop {
+            if self.start_recv(token) {
+                unsafe {
+                    return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+                }
+            }
+
+            if let Some(d) = deadline {
+                if Instant::now() >= d {
+                    return Err(RecvTimeoutError::Timeout);
+                }
+            }
+
+            // Prepare for blocking until a sender wakes us up.
+            Context::with(|cx| {
+                let oper = Operation::hook(token);
+                self.receivers.register(oper, cx);
+
+                // Has the channel become ready just now?
+                if !self.is_empty() || self.is_disconnected() {
+                    let _ = cx.try_select(Selected::Aborted);
+                }
+
+                // Block the current thread.
+                let sel = cx.wait_until(deadline);
+
+                match sel {
+                    Selected::Waiting => unreachable!(),
+                    Selected::Aborted | Selected::Disconnected => {
+                        self.receivers.unregister(oper).unwrap();
+                        // If the channel was disconnected, we still have to check for remaining
+                        // messages.
+                    }
+                    Selected::Operation(_) => {}
+                }
+            });
+        }
+    }
+
+    /// Returns the current number of messages inside the channel.
+    pub(crate) fn len(&self) -> usize {
+        loop {
+            // Load the tail index, then load the head index.
+            let mut tail = self.tail.index.load(Ordering::SeqCst);
+            let mut head = self.head.index.load(Ordering::SeqCst);
+
+            // If the tail index didn't change, we've got consistent indices to work with.
+            if self.tail.index.load(Ordering::SeqCst) == tail {
+                // Erase the lower bits.
+                tail &= !((1 << SHIFT) - 1);
+                head &= !((1 << SHIFT) - 1);
+
+                // Fix up indices if they fall onto block ends.
+                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
+                    tail = tail.wrapping_add(1 << SHIFT);
+                }
+                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
+                    head = head.wrapping_add(1 << SHIFT);
+                }
+
+                // Rotate indices so that head falls into the first block.
+                let lap = (head >> SHIFT) / LAP;
+                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+                head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+                // Remove the lower bits.
+                tail >>= SHIFT;
+                head >>= SHIFT;
+
+                // Return the difference minus the number of blocks between tail and head.
+                return tail - head - tail / LAP;
+            }
+        }
+    }
+
+    /// Returns the capacity of the channel.
+    pub(crate) fn capacity(&self) -> Option<usize> {
+        None
+    }
+
+    /// Disconnects senders and wakes up all blocked receivers.
+    ///
+    /// Returns `true` if this call disconnected the channel.
+    pub(crate) fn disconnect_senders(&self) -> bool {
+        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+        if tail & MARK_BIT == 0 {
+            self.receivers.disconnect();
+            true
+        } else {
+            false
+        }
+    }
+
+    /// Disconnects receivers.
+    ///
+    /// Returns `true` if this call disconnected the channel.
+    pub(crate) fn disconnect_receivers(&self) -> bool {
+        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+        if tail & MARK_BIT == 0 {
+            // If receivers are dropped first, discard all messages to free
+            // memory eagerly.
+            self.discard_all_messages();
+            true
+        } else {
+            false
+        }
+    }
+
+    /// Discards all messages.
+    ///
+    /// This method should only be called when all receivers are dropped.
+    fn discard_all_messages(&self) {
+        let backoff = Backoff::new();
+        let mut tail = self.tail.index.load(Ordering::Acquire);
+        loop {
+            let offset = (tail >> SHIFT) % LAP;
+            if offset != BLOCK_CAP {
+                break;
+            }
+
+            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+            // at boundary. We need to wait for the updates take affect otherwise there
+            // can be memory leaks.
+            backoff.snooze();
+            tail = self.tail.index.load(Ordering::Acquire);
+        }
+
+        let mut head = self.head.index.load(Ordering::Acquire);
+        let mut block = self.head.block.load(Ordering::Acquire);
+
+        unsafe {
+            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+            while head >> SHIFT != tail >> SHIFT {
+                let offset = (head >> SHIFT) % LAP;
+
+                if offset < BLOCK_CAP {
+                    // Drop the message in the slot.
+                    let slot = (*block).slots.get_unchecked(offset);
+                    slot.wait_write();
+                    let p = &mut *slot.msg.get();
+                    p.as_mut_ptr().drop_in_place();
+                } else {
+                    (*block).wait_next();
+                    // Deallocate the block and move to the next one.
+                    let next = (*block).next.load(Ordering::Acquire);
+                    drop(Box::from_raw(block));
+                    block = next;
+                }
+
+                head = head.wrapping_add(1 << SHIFT);
+            }
+
+            // Deallocate the last remaining block.
+            if !block.is_null() {
+                drop(Box::from_raw(block));
+            }
+        }
+        head &= !MARK_BIT;
+        self.head.block.store(ptr::null_mut(), Ordering::Release);
+        self.head.index.store(head, Ordering::Release);
+    }
+
+    /// Returns `true` if the channel is disconnected.
+    pub(crate) fn is_disconnected(&self) -> bool {
+        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
+    }
+
+    /// Returns `true` if the channel is empty.
+    pub(crate) fn is_empty(&self) -> bool {
+        let head = self.head.index.load(Ordering::SeqCst);
+        let tail = self.tail.index.load(Ordering::SeqCst);
+        head >> SHIFT == tail >> SHIFT
+    }
+
+    /// Returns `true` if the channel is full.
+    pub(crate) fn is_full(&self) -> bool {
+        false
+    }
+}
+
+impl<T> Drop for Channel<T> {
+    fn drop(&mut self) {
+        let mut head = self.head.index.load(Ordering::Relaxed);
+        let mut tail = self.tail.index.load(Ordering::Relaxed);
+        let mut block = self.head.block.load(Ordering::Relaxed);
+
+        // Erase the lower bits.
+        head &= !((1 << SHIFT) - 1);
+        tail &= !((1 << SHIFT) - 1);
+
+        unsafe {
+            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+            while head != tail {
+                let offset = (head >> SHIFT) % LAP;
+
+                if offset < BLOCK_CAP {
+                    // Drop the message in the slot.
+                    let slot = (*block).slots.get_unchecked(offset);
+                    let p = &mut *slot.msg.get();
+                    p.as_mut_ptr().drop_in_place();
+                } else {
+                    // Deallocate the block and move to the next one.
+                    let next = (*block).next.load(Ordering::Relaxed);
+                    drop(Box::from_raw(block));
+                    block = next;
+                }
+
+                head = head.wrapping_add(1 << SHIFT);
+            }
+
+            // Deallocate the last remaining block.
+            if !block.is_null() {
+                drop(Box::from_raw(block));
+            }
+        }
+    }
+}
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
new file mode 100644
index 00000000000..cef99c58843
--- /dev/null
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -0,0 +1,430 @@
+//! Multi-producer multi-consumer channels.
+
+// This module is not currently exposed publicly, but is used
+// as the implementation for the channels in `sync::mpsc`. The
+// implementation comes from the crossbeam-channel crate:
+//
+// Copyright (c) 2019 The Crossbeam Project Developers
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+mod array;
+mod context;
+mod counter;
+mod error;
+mod list;
+mod select;
+mod utils;
+mod waker;
+mod zero;
+
+use crate::fmt;
+use crate::panic::{RefUnwindSafe, UnwindSafe};
+use crate::time::{Duration, Instant};
+use error::*;
+
+/// Creates a channel of unbounded capacity.
+///
+/// This channel has a growable buffer that can hold any number of messages at a time.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+    let (s, r) = counter::new(list::Channel::new());
+    let s = Sender { flavor: SenderFlavor::List(s) };
+    let r = Receiver { flavor: ReceiverFlavor::List(r) };
+    (s, r)
+}
+
+/// Creates a channel of bounded capacity.
+///
+/// This channel has a buffer that can hold at most `cap` messages at a time.
+///
+/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
+/// receive operations must appear at the same time in order to pair up and pass the message over.
+pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
+    if cap == 0 {
+        let (s, r) = counter::new(zero::Channel::new());
+        let s = Sender { flavor: SenderFlavor::Zero(s) };
+        let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
+        (s, r)
+    } else {
+        let (s, r) = counter::new(array::Channel::with_capacity(cap));
+        let s = Sender { flavor: SenderFlavor::Array(s) };
+        let r = Receiver { flavor: ReceiverFlavor::Array(r) };
+        (s, r)
+    }
+}
+
+/// The sending side of a channel.
+pub struct Sender<T> {
+    flavor: SenderFlavor<T>,
+}
+
+/// Sender flavors.
+enum SenderFlavor<T> {
+    /// Bounded channel based on a preallocated array.
+    Array(counter::Sender<array::Channel<T>>),
+
+    /// Unbounded channel implemented as a linked list.
+    List(counter::Sender<list::Channel<T>>),
+
+    /// Zero-capacity channel.
+    Zero(counter::Sender<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Sync for Sender<T> {}
+
+impl<T> UnwindSafe for Sender<T> {}
+impl<T> RefUnwindSafe for Sender<T> {}
+
+impl<T> Sender<T> {
+    /// Attempts to send a message into the channel without blocking.
+    ///
+    /// This method will either send a message into the channel immediately or return an error if
+    /// the channel is full or disconnected. The returned error contains the original message.
+    ///
+    /// If called on a zero-capacity channel, this method will send the message only if there
+    /// happens to be a receive operation on the other side of the channel at the same time.
+    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.try_send(msg),
+            SenderFlavor::List(chan) => chan.try_send(msg),
+            SenderFlavor::Zero(chan) => chan.try_send(msg),
+        }
+    }
+
+    /// Blocks the current thread until a message is sent or the channel is disconnected.
+    ///
+    /// If the channel is full and not disconnected, this call will block until the send operation
+    /// can proceed. If the channel becomes disconnected, this call will wake up and return an
+    /// error. The returned error contains the original message.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a receive operation to
+    /// appear on the other side of the channel.
+    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.send(msg, None),
+            SenderFlavor::List(chan) => chan.send(msg, None),
+            SenderFlavor::Zero(chan) => chan.send(msg, None),
+        }
+        .map_err(|err| match err {
+            SendTimeoutError::Disconnected(msg) => SendError(msg),
+            SendTimeoutError::Timeout(_) => unreachable!(),
+        })
+    }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Sender<T> {
+    /// Waits for a message to be sent into the channel, but only for a limited time.
+    ///
+    /// If the channel is full and not disconnected, this call will block until the send operation
+    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+    /// wake up and return an error. The returned error contains the original message.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a receive operation to
+    /// appear on the other side of the channel.
+    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
+        match Instant::now().checked_add(timeout) {
+            Some(deadline) => self.send_deadline(msg, deadline),
+            // So far in the future that it's practically the same as waiting indefinitely.
+            None => self.send(msg).map_err(SendTimeoutError::from),
+        }
+    }
+
+    /// Waits for a message to be sent into the channel, but only until a given deadline.
+    ///
+    /// If the channel is full and not disconnected, this call will block until the send operation
+    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+    /// wake up and return an error. The returned error contains the original message.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a receive operation to
+    /// appear on the other side of the channel.
+    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
+            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
+            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
+        }
+    }
+
+    /// Returns `true` if the channel is empty.
+    ///
+    /// Note: Zero-capacity channels are always empty.
+    pub fn is_empty(&self) -> bool {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.is_empty(),
+            SenderFlavor::List(chan) => chan.is_empty(),
+            SenderFlavor::Zero(chan) => chan.is_empty(),
+        }
+    }
+
+    /// Returns `true` if the channel is full.
+    ///
+    /// Note: Zero-capacity channels are always full.
+    pub fn is_full(&self) -> bool {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.is_full(),
+            SenderFlavor::List(chan) => chan.is_full(),
+            SenderFlavor::Zero(chan) => chan.is_full(),
+        }
+    }
+
+    /// Returns the number of messages in the channel.
+    pub fn len(&self) -> usize {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.len(),
+            SenderFlavor::List(chan) => chan.len(),
+            SenderFlavor::Zero(chan) => chan.len(),
+        }
+    }
+
+    /// If the channel is bounded, returns its capacity.
+    pub fn capacity(&self) -> Option<usize> {
+        match &self.flavor {
+            SenderFlavor::Array(chan) => chan.capacity(),
+            SenderFlavor::List(chan) => chan.capacity(),
+            SenderFlavor::Zero(chan) => chan.capacity(),
+        }
+    }
+
+    /// Returns `true` if senders belong to the same channel.
+    pub fn same_channel(&self, other: &Sender<T>) -> bool {
+        match (&self.flavor, &other.flavor) {
+            (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
+            (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
+            (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
+            _ => false,
+        }
+    }
+}
+
+impl<T> Drop for Sender<T> {
+    fn drop(&mut self) {
+        unsafe {
+            match &self.flavor {
+                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
+                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+            }
+        }
+    }
+}
+
+impl<T> Clone for Sender<T> {
+    fn clone(&self) -> Self {
+        let flavor = match &self.flavor {
+            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
+            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
+            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
+        };
+
+        Sender { flavor }
+    }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.pad("Sender { .. }")
+    }
+}
+
+/// The receiving side of a channel.
+pub struct Receiver<T> {
+    flavor: ReceiverFlavor<T>,
+}
+
+/// Receiver flavors.
+enum ReceiverFlavor<T> {
+    /// Bounded channel based on a preallocated array.
+    Array(counter::Receiver<array::Channel<T>>),
+
+    /// Unbounded channel implemented as a linked list.
+    List(counter::Receiver<list::Channel<T>>),
+
+    /// Zero-capacity channel.
+    Zero(counter::Receiver<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Receiver<T> {}
+unsafe impl<T: Send> Sync for Receiver<T> {}
+
+impl<T> UnwindSafe for Receiver<T> {}
+impl<T> RefUnwindSafe for Receiver<T> {}
+
+impl<T> Receiver<T> {
+    /// Attempts to receive a message from the channel without blocking.
+    ///
+    /// This method will either receive a message from the channel immediately or return an error
+    /// if the channel is empty.
+    ///
+    /// If called on a zero-capacity channel, this method will receive a message only if there
+    /// happens to be a send operation on the other side of the channel at the same time.
+    pub fn try_recv(&self) -> Result<T, TryRecvError> {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.try_recv(),
+            ReceiverFlavor::List(chan) => chan.try_recv(),
+            ReceiverFlavor::Zero(chan) => chan.try_recv(),
+        }
+    }
+
+    /// Blocks the current thread until a message is received or the channel is empty and
+    /// disconnected.
+    ///
+    /// If the channel is empty and not disconnected, this call will block until the receive
+    /// operation can proceed. If the channel is empty and becomes disconnected, this call will
+    /// wake up and return an error.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+    /// on the other side of the channel.
+    pub fn recv(&self) -> Result<T, RecvError> {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.recv(None),
+            ReceiverFlavor::List(chan) => chan.recv(None),
+            ReceiverFlavor::Zero(chan) => chan.recv(None),
+        }
+        .map_err(|_| RecvError)
+    }
+
+    /// Waits for a message to be received from the channel, but only for a limited time.
+    ///
+    /// If the channel is empty and not disconnected, this call will block until the receive
+    /// operation can proceed or the operation times out. If the channel is empty and becomes
+    /// disconnected, this call will wake up and return an error.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+    /// on the other side of the channel.
+    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+        match Instant::now().checked_add(timeout) {
+            Some(deadline) => self.recv_deadline(deadline),
+            // So far in the future that it's practically the same as waiting indefinitely.
+            None => self.recv().map_err(RecvTimeoutError::from),
+        }
+    }
+
+    /// Waits for a message to be received from the channel, but only for a limited time.
+    ///
+    /// If the channel is empty and not disconnected, this call will block until the receive
+    /// operation can proceed or the operation times out. If the channel is empty and becomes
+    /// disconnected, this call will wake up and return an error.
+    ///
+    /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+    /// on the other side of the channel.
+    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
+            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
+            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
+        }
+    }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Receiver<T> {
+    /// Returns `true` if the channel is empty.
+    ///
+    /// Note: Zero-capacity channels are always empty.
+    pub fn is_empty(&self) -> bool {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.is_empty(),
+            ReceiverFlavor::List(chan) => chan.is_empty(),
+            ReceiverFlavor::Zero(chan) => chan.is_empty(),
+        }
+    }
+
+    /// Returns `true` if the channel is full.
+    ///
+    /// Note: Zero-capacity channels are always full.
+    pub fn is_full(&self) -> bool {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.is_full(),
+            ReceiverFlavor::List(chan) => chan.is_full(),
+            ReceiverFlavor::Zero(chan) => chan.is_full(),
+        }
+    }
+
+    /// Returns the number of messages in the channel.
+    pub fn len(&self) -> usize {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.len(),
+            ReceiverFlavor::List(chan) => chan.len(),
+            ReceiverFlavor::Zero(chan) => chan.len(),
+        }
+    }
+
+    /// If the channel is bounded, returns its capacity.
+    pub fn capacity(&self) -> Option<usize> {
+        match &self.flavor {
+            ReceiverFlavor::Array(chan) => chan.capacity(),
+            ReceiverFlavor::List(chan) => chan.capacity(),
+            ReceiverFlavor::Zero(chan) => chan.capacity(),
+        }
+    }
+
+    /// Returns `true` if receivers belong to the same channel.
+    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
+        match (&self.flavor, &other.flavor) {
+            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
+            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
+            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
+            _ => false,
+        }
+    }
+}
+
+impl<T> Drop for Receiver<T> {
+    fn drop(&mut self) {
+        unsafe {
+            match &self.flavor {
+                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
+                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+            }
+        }
+    }
+}
+
+impl<T> Clone for Receiver<T> {
+    fn clone(&self) -> Self {
+        let flavor = match &self.flavor {
+            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
+            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
+            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
+        };
+
+        Receiver { flavor }
+    }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.pad("Receiver { .. }")
+    }
+}
diff --git a/library/std/src/sync/mpmc/select.rs b/library/std/src/sync/mpmc/select.rs
new file mode 100644
index 00000000000..56a83fee2e1
--- /dev/null
+++ b/library/std/src/sync/mpmc/select.rs
@@ -0,0 +1,71 @@
+/// Temporary data that gets initialized during a blocking operation, and is consumed by
+/// `read` or `write`.
+///
+/// Each field contains data associated with a specific channel flavor.
+#[derive(Debug, Default)]
+pub struct Token {
+    pub(crate) array: super::array::ArrayToken,
+    pub(crate) list: super::list::ListToken,
+    #[allow(dead_code)]
+    pub(crate) zero: super::zero::ZeroToken,
+}
+
+/// Identifier associated with an operation by a specific thread on a specific channel.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Operation(usize);
+
+impl Operation {
+    /// Creates an operation identifier from a mutable reference.
+    ///
+    /// This function essentially just turns the address of the reference into a number. The
+    /// reference should point to a variable that is specific to the thread and the operation,
+    /// and is alive for the entire duration of a blocking operation.
+    #[inline]
+    pub fn hook<T>(r: &mut T) -> Operation {
+        let val = r as *mut T as usize;
+        // Make sure that the pointer address doesn't equal the numerical representation of
+        // `Selected::{Waiting, Aborted, Disconnected}`.
+        assert!(val > 2);
+        Operation(val)
+    }
+}
+
+/// Current state of a blocking operation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Selected {
+    /// Still waiting for an operation.
+    Waiting,
+
+    /// The attempt to block the current thread has been aborted.
+    Aborted,
+
+    /// An operation became ready because a channel is disconnected.
+    Disconnected,
+
+    /// An operation became ready because a message can be sent or received.
+    Operation(Operation),
+}
+
+impl From<usize> for Selected {
+    #[inline]
+    fn from(val: usize) -> Selected {
+        match val {
+            0 => Selected::Waiting,
+            1 => Selected::Aborted,
+            2 => Selected::Disconnected,
+            oper => Selected::Operation(Operation(oper)),
+        }
+    }
+}
+
+impl Into<usize> for Selected {
+    #[inline]
+    fn into(self) -> usize {
+        match self {
+            Selected::Waiting => 0,
+            Selected::Aborted => 1,
+            Selected::Disconnected => 2,
+            Selected::Operation(Operation(val)) => val,
+        }
+    }
+}
diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs
new file mode 100644
index 00000000000..d0904b4b94c
--- /dev/null
+++ b/library/std/src/sync/mpmc/utils.rs
@@ -0,0 +1,144 @@
+use crate::cell::Cell;
+use crate::ops::{Deref, DerefMut};
+
+/// Pads and aligns a value to the length of a cache line.
+#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
+// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
+// lines at a time, so we have to align to 128 bytes rather than 64.
+//
+// Sources:
+// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
+// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
+//
+// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
+//
+// Sources:
+// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
+//
+// powerpc64 has 128-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+#[cfg_attr(
+    any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",),
+    repr(align(128))
+)]
+// arm, mips, mips64, and riscv64 have 32-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+#[cfg_attr(
+    any(
+        target_arch = "arm",
+        target_arch = "mips",
+        target_arch = "mips64",
+        target_arch = "riscv64",
+    ),
+    repr(align(32))
+)]
+// s390x has 256-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+#[cfg_attr(target_arch = "s390x", repr(align(256)))]
+// x86 and wasm have 64-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+//
+// All others are assumed to have 64-byte cache line size.
+#[cfg_attr(
+    not(any(
+        target_arch = "x86_64",
+        target_arch = "aarch64",
+        target_arch = "powerpc64",
+        target_arch = "arm",
+        target_arch = "mips",
+        target_arch = "mips64",
+        target_arch = "riscv64",
+        target_arch = "s390x",
+    )),
+    repr(align(64))
+)]
+pub struct CachePadded<T> {
+    value: T,
+}
+
+impl<T> CachePadded<T> {
+    /// Pads and aligns a value to the length of a cache line.
+    pub fn new(value: T) -> CachePadded<T> {
+        CachePadded::<T> { value }
+    }
+}
+
+impl<T> Deref for CachePadded<T> {
+    type Target = T;
+
+    fn deref(&self) -> &T {
+        &self.value
+    }
+}
+
+impl<T> DerefMut for CachePadded<T> {
+    fn deref_mut(&mut self) -> &mut T {
+        &mut self.value
+    }
+}
+
+const SPIN_LIMIT: u32 = 6;
+const YIELD_LIMIT: u32 = 10;
+
+/// Performs exponential backoff in spin loops.
+pub struct Backoff {
+    step: Cell<u32>,
+}
+
+impl Backoff {
+    /// Creates a new `Backoff`.
+    pub fn new() -> Self {
+        Backoff { step: Cell::new(0) }
+    }
+
+    /// Backs off in a lock-free loop.
+    ///
+    /// This method should be used when we need to retry an operation because another thread made
+    /// progress.
+    #[inline]
+    pub fn spin(&self) {
+        let step = self.step.get().min(SPIN_LIMIT);
+        for _ in 0..step.pow(2) {
+            crate::hint::spin_loop();
+        }
+
+        if self.step.get() <= SPIN_LIMIT {
+            self.step.set(self.step.get() + 1);
+        }
+    }
+
+    /// Backs off in a blocking loop.
+    #[inline]
+    pub fn snooze(&self) {
+        if self.step.get() <= SPIN_LIMIT {
+            for _ in 0..self.step.get().pow(2) {
+                crate::hint::spin_loop()
+            }
+        } else {
+            crate::thread::yield_now();
+        }
+
+        if self.step.get() <= YIELD_LIMIT {
+            self.step.set(self.step.get() + 1);
+        }
+    }
+
+    /// Returns `true` if exponential backoff has completed and blocking the thread is advised.
+    #[inline]
+    pub fn is_completed(&self) -> bool {
+        self.step.get() > YIELD_LIMIT
+    }
+}
diff --git a/library/std/src/sync/mpmc/waker.rs b/library/std/src/sync/mpmc/waker.rs
new file mode 100644
index 00000000000..4912ca4f815
--- /dev/null
+++ b/library/std/src/sync/mpmc/waker.rs
@@ -0,0 +1,204 @@
+//! Waking mechanism for threads blocked on channel operations.
+
+use super::context::Context;
+use super::select::{Operation, Selected};
+
+use crate::ptr;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+
+/// Represents a thread blocked on a specific channel operation.
+pub(crate) struct Entry {
+    /// The operation.
+    pub(crate) oper: Operation,
+
+    /// Optional packet.
+    pub(crate) packet: *mut (),
+
+    /// Context associated with the thread owning this operation.
+    pub(crate) cx: Context,
+}
+
+/// A queue of threads blocked on channel operations.
+///
+/// This data structure is used by threads to register blocking operations and get woken up once
+/// an operation becomes ready.
+pub(crate) struct Waker {
+    /// A list of select operations.
+    selectors: Vec<Entry>,
+
+    /// A list of operations waiting to be ready.
+    observers: Vec<Entry>,
+}
+
+impl Waker {
+    /// Creates a new `Waker`.
+    #[inline]
+    pub(crate) fn new() -> Self {
+        Waker { selectors: Vec::new(), observers: Vec::new() }
+    }
+
+    /// Registers a select operation.
+    #[inline]
+    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
+        self.register_with_packet(oper, ptr::null_mut(), cx);
+    }
+
+    /// Registers a select operation and a packet.
+    #[inline]
+    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
+        self.selectors.push(Entry { oper, packet, cx: cx.clone() });
+    }
+
+    /// Unregisters a select operation.
+    #[inline]
+    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
+        if let Some((i, _)) =
+            self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
+        {
+            let entry = self.selectors.remove(i);
+            Some(entry)
+        } else {
+            None
+        }
+    }
+
+    /// Attempts to find another thread's entry, select the operation, and wake it up.
+    #[inline]
+    pub(crate) fn try_select(&mut self) -> Option<Entry> {
+        self.selectors
+            .iter()
+            .position(|selector| {
+                // Does the entry belong to a different thread?
+                selector.cx.thread_id() != current_thread_id()
+                    && selector // Try selecting this operation.
+                        .cx
+                        .try_select(Selected::Operation(selector.oper))
+                        .is_ok()
+                    && {
+                        // Provide the packet.
+                        selector.cx.store_packet(selector.packet);
+                        // Wake the thread up.
+                        selector.cx.unpark();
+                        true
+                    }
+            })
+            // Remove the entry from the queue to keep it clean and improve
+            // performance.
+            .map(|pos| self.selectors.remove(pos))
+    }
+
+    /// Notifies all operations waiting to be ready.
+    #[inline]
+    pub(crate) fn notify(&mut self) {
+        for entry in self.observers.drain(..) {
+            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
+                entry.cx.unpark();
+            }
+        }
+    }
+
+    /// Notifies all registered operations that the channel is disconnected.
+    #[inline]
+    pub(crate) fn disconnect(&mut self) {
+        for entry in self.selectors.iter() {
+            if entry.cx.try_select(Selected::Disconnected).is_ok() {
+                // Wake the thread up.
+                //
+                // Here we don't remove the entry from the queue. Registered threads must
+                // unregister from the waker by themselves. They might also want to recover the
+                // packet value and destroy it, if necessary.
+                entry.cx.unpark();
+            }
+        }
+
+        self.notify();
+    }
+}
+
+impl Drop for Waker {
+    #[inline]
+    fn drop(&mut self) {
+        debug_assert_eq!(self.selectors.len(), 0);
+        debug_assert_eq!(self.observers.len(), 0);
+    }
+}
+
+/// A waker that can be shared among threads without locking.
+///
+/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
+pub(crate) struct SyncWaker {
+    /// The inner `Waker`.
+    inner: Mutex<Waker>,
+
+    /// `true` if the waker is empty.
+    is_empty: AtomicBool,
+}
+
+impl SyncWaker {
+    /// Creates a new `SyncWaker`.
+    #[inline]
+    pub(crate) fn new() -> Self {
+        SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
+    }
+
+    /// Registers the current thread with an operation.
+    #[inline]
+    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
+        let mut inner = self.inner.lock().unwrap();
+        inner.register(oper, cx);
+        self.is_empty
+            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+    }
+
+    /// Unregisters an operation previously registered by the current thread.
+    #[inline]
+    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
+        let mut inner = self.inner.lock().unwrap();
+        let entry = inner.unregister(oper);
+        self.is_empty
+            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+        entry
+    }
+
+    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
+    #[inline]
+    pub(crate) fn notify(&self) {
+        if !self.is_empty.load(Ordering::SeqCst) {
+            let mut inner = self.inner.lock().unwrap();
+            if !self.is_empty.load(Ordering::SeqCst) {
+                inner.try_select();
+                inner.notify();
+                self.is_empty.store(
+                    inner.selectors.is_empty() && inner.observers.is_empty(),
+                    Ordering::SeqCst,
+                );
+            }
+        }
+    }
+
+    /// Notifies all threads that the channel is disconnected.
+    #[inline]
+    pub(crate) fn disconnect(&self) {
+        let mut inner = self.inner.lock().unwrap();
+        inner.disconnect();
+        self.is_empty
+            .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+    }
+}
+
+impl Drop for SyncWaker {
+    #[inline]
+    fn drop(&mut self) {
+        debug_assert!(self.is_empty.load(Ordering::SeqCst));
+    }
+}
+
+/// Returns a unique id for the current thread.
+#[inline]
+pub fn current_thread_id() -> usize {
+    // `u8` is not drop so this variable will be available during thread destruction,
+    // whereas `thread::current()` would not be
+    thread_local! { static DUMMY: u8 = 0 }
+    DUMMY.with(|x| (x as *const u8).addr())
+}
diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs
new file mode 100644
index 00000000000..fccd6c29a7e
--- /dev/null
+++ b/library/std/src/sync/mpmc/zero.rs
@@ -0,0 +1,318 @@
+//! Zero-capacity channel.
+//!
+//! This kind of channel is also known as *rendezvous* channel.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::Backoff;
+use super::waker::Waker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+use crate::time::Instant;
+use crate::{fmt, ptr};
+
+/// A pointer to a packet.
+pub(crate) struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+    fn default() -> Self {
+        Self(ptr::null_mut())
+    }
+}
+
+impl fmt::Debug for ZeroToken {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        fmt::Debug::fmt(&(self.0 as usize), f)
+    }
+}
+
+/// A slot for passing one message from a sender to a receiver.
+struct Packet<T> {
+    /// Equals `true` if the packet is allocated on the stack.
+    on_stack: bool,
+
+    /// Equals `true` once the packet is ready for reading or writing.
+    ready: AtomicBool,
+
+    /// The message.
+    msg: UnsafeCell<Option<T>>,
+}
+
+impl<T> Packet<T> {
+    /// Creates an empty packet on the stack.
+    fn empty_on_stack() -> Packet<T> {
+        Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
+    }
+
+    /// Creates a packet on the stack, containing a message.
+    fn message_on_stack(msg: T) -> Packet<T> {
+        Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
+    }
+
+    /// Waits until the packet becomes ready for reading or writing.
+    fn wait_ready(&self) {
+        let backoff = Backoff::new();
+        while !self.ready.load(Ordering::Acquire) {
+            backoff.snooze();
+        }
+    }
+}
+
+/// Inner representation of a zero-capacity channel.
+struct Inner {
+    /// Senders waiting to pair up with a receive operation.
+    senders: Waker,
+
+    /// Receivers waiting to pair up with a send operation.
+    receivers: Waker,
+
+    /// Equals `true` when the channel is disconnected.
+    is_disconnected: bool,
+}
+
+/// Zero-capacity channel.
+pub(crate) struct Channel<T> {
+    /// Inner representation of the channel.
+    inner: Mutex<Inner>,
+
+    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+    _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+    /// Constructs a new zero-capacity channel.
+    pub(crate) fn new() -> Self {
+        Channel {
+            inner: Mutex::new(Inner {
+                senders: Waker::new(),
+                receivers: Waker::new(),
+                is_disconnected: false,
+            }),
+            _marker: PhantomData,
+        }
+    }
+
+    /// Writes a message into the packet.
+    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+        // If there is no packet, the channel is disconnected.
+        if token.zero.0.is_null() {
+            return Err(msg);
+        }
+
+        let packet = &*(token.zero.0 as *const Packet<T>);
+        packet.msg.get().write(Some(msg));
+        packet.ready.store(true, Ordering::Release);
+        Ok(())
+    }
+
+    /// Reads a message from the packet.
+    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+        // If there is no packet, the channel is disconnected.
+        if token.zero.0.is_null() {
+            return Err(());
+        }
+
+        let packet = &*(token.zero.0 as *const Packet<T>);
+
+        if packet.on_stack {
+            // The message has been in the packet from the beginning, so there is no need to wait
+            // for it. However, after reading the message, we need to set `ready` to `true` in
+            // order to signal that the packet can be destroyed.
+            let msg = packet.msg.get().replace(None).unwrap();
+            packet.ready.store(true, Ordering::Release);
+            Ok(msg)
+        } else {
+            // Wait until the message becomes available, then read it and destroy the
+            // heap-allocated packet.
+            packet.wait_ready();
+            let msg = packet.msg.get().replace(None).unwrap();
+            drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
+            Ok(msg)
+        }
+    }
+
+    /// Attempts to send a message into the channel.
+    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        let token = &mut Token::default();
+        let mut inner = self.inner.lock().unwrap();
+
+        // If there's a waiting receiver, pair up with it.
+        if let Some(operation) = inner.receivers.try_select() {
+            token.zero.0 = operation.packet;
+            drop(inner);
+            unsafe {
+                self.write(token, msg).ok().unwrap();
+            }
+            Ok(())
+        } else if inner.is_disconnected {
+            Err(TrySendError::Disconnected(msg))
+        } else {
+            Err(TrySendError::Full(msg))
+        }
+    }
+
+    /// Sends a message into the channel.
+    pub(crate) fn send(
+        &self,
+        msg: T,
+        deadline: Option<Instant>,
+    ) -> Result<(), SendTimeoutError<T>> {
+        let token = &mut Token::default();
+        let mut inner = self.inner.lock().unwrap();
+
+        // If there's a waiting receiver, pair up with it.
+        if let Some(operation) = inner.receivers.try_select() {
+            token.zero.0 = operation.packet;
+            drop(inner);
+            unsafe {
+                self.write(token, msg).ok().unwrap();
+            }
+            return Ok(());
+        }
+
+        if inner.is_disconnected {
+            return Err(SendTimeoutError::Disconnected(msg));
+        }
+
+        Context::with(|cx| {
+            // Prepare for blocking until a receiver wakes us up.
+            let oper = Operation::hook(token);
+            let mut packet = Packet::<T>::message_on_stack(msg);
+            inner.senders.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
+            inner.receivers.notify();
+            drop(inner);
+
+            // Block the current thread.
+            let sel = cx.wait_until(deadline);
+
+            match sel {
+                Selected::Waiting => unreachable!(),
+                Selected::Aborted => {
+                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+                    Err(SendTimeoutError::Timeout(msg))
+                }
+                Selected::Disconnected => {
+                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+                    Err(SendTimeoutError::Disconnected(msg))
+                }
+                Selected::Operation(_) => {
+                    // Wait until the message is read, then drop the packet.
+                    packet.wait_ready();
+                    Ok(())
+                }
+            }
+        })
+    }
+
+    /// Attempts to receive a message without blocking.
+    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+        let token = &mut Token::default();
+        let mut inner = self.inner.lock().unwrap();
+
+        // If there's a waiting sender, pair up with it.
+        if let Some(operation) = inner.senders.try_select() {
+            token.zero.0 = operation.packet;
+            drop(inner);
+            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+        } else if inner.is_disconnected {
+            Err(TryRecvError::Disconnected)
+        } else {
+            Err(TryRecvError::Empty)
+        }
+    }
+
+    /// Receives a message from the channel.
+    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+        let token = &mut Token::default();
+        let mut inner = self.inner.lock().unwrap();
+
+        // If there's a waiting sender, pair up with it.
+        if let Some(operation) = inner.senders.try_select() {
+            token.zero.0 = operation.packet;
+            drop(inner);
+            unsafe {
+                return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+            }
+        }
+
+        if inner.is_disconnected {
+            return Err(RecvTimeoutError::Disconnected);
+        }
+
+        Context::with(|cx| {
+            // Prepare for blocking until a sender wakes us up.
+            let oper = Operation::hook(token);
+            let mut packet = Packet::<T>::empty_on_stack();
+            inner.receivers.register_with_packet(
+                oper,
+                &mut packet as *mut Packet<T> as *mut (),
+                cx,
+            );
+            inner.senders.notify();
+            drop(inner);
+
+            // Block the current thread.
+            let sel = cx.wait_until(deadline);
+
+            match sel {
+                Selected::Waiting => unreachable!(),
+                Selected::Aborted => {
+                    self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+                    Err(RecvTimeoutError::Timeout)
+                }
+                Selected::Disconnected => {
+                    self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+                    Err(RecvTimeoutError::Disconnected)
+                }
+                Selected::Operation(_) => {
+                    // Wait until the message is provided, then read it.
+                    packet.wait_ready();
+                    unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
+                }
+            }
+        })
+    }
+
+    /// Disconnects the channel and wakes up all blocked senders and receivers.
+    ///
+    /// Returns `true` if this call disconnected the channel.
+    pub(crate) fn disconnect(&self) -> bool {
+        let mut inner = self.inner.lock().unwrap();
+
+        if !inner.is_disconnected {
+            inner.is_disconnected = true;
+            inner.senders.disconnect();
+            inner.receivers.disconnect();
+            true
+        } else {
+            false
+        }
+    }
+
+    /// Returns the current number of messages inside the channel.
+    pub(crate) fn len(&self) -> usize {
+        0
+    }
+
+    /// Returns the capacity of the channel.
+    #[allow(clippy::unnecessary_wraps)] // This is intentional.
+    pub(crate) fn capacity(&self) -> Option<usize> {
+        Some(0)
+    }
+
+    /// Returns `true` if the channel is empty.
+    pub(crate) fn is_empty(&self) -> bool {
+        true
+    }
+
+    /// Returns `true` if the channel is full.
+    pub(crate) fn is_full(&self) -> bool {
+        true
+    }
+}
diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs
deleted file mode 100644
index 021df7b096c..00000000000
--- a/library/std/src/sync/mpsc/blocking.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-//! Generic support for building blocking abstractions.
-
-use crate::sync::atomic::{AtomicBool, Ordering};
-use crate::sync::Arc;
-use crate::thread::{self, Thread};
-use crate::time::Instant;
-
-struct Inner {
-    thread: Thread,
-    woken: AtomicBool,
-}
-
-unsafe impl Send for Inner {}
-unsafe impl Sync for Inner {}
-
-#[derive(Clone)]
-pub struct SignalToken {
-    inner: Arc<Inner>,
-}
-
-pub struct WaitToken {
-    inner: Arc<Inner>,
-}
-
-impl !Send for WaitToken {}
-
-impl !Sync for WaitToken {}
-
-pub fn tokens() -> (WaitToken, SignalToken) {
-    let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) });
-    let wait_token = WaitToken { inner: inner.clone() };
-    let signal_token = SignalToken { inner };
-    (wait_token, signal_token)
-}
-
-impl SignalToken {
-    pub fn signal(&self) -> bool {
-        let wake = self
-            .inner
-            .woken
-            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
-            .is_ok();
-        if wake {
-            self.inner.thread.unpark();
-        }
-        wake
-    }
-
-    /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
-    /// flag.
-    #[inline]
-    pub unsafe fn to_raw(self) -> *mut u8 {
-        Arc::into_raw(self.inner) as *mut u8
-    }
-
-    /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
-    /// flag.
-    #[inline]
-    pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
-        SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
-    }
-}
-
-impl WaitToken {
-    pub fn wait(self) {
-        while !self.inner.woken.load(Ordering::SeqCst) {
-            thread::park()
-        }
-    }
-
-    /// Returns `true` if we wake up normally.
-    pub fn wait_max_until(self, end: Instant) -> bool {
-        while !self.inner.woken.load(Ordering::SeqCst) {
-            let now = Instant::now();
-            if now >= end {
-                return false;
-            }
-            thread::park_timeout(end - now)
-        }
-        true
-    }
-}
diff --git a/library/std/src/sync/mpsc/cache_aligned.rs b/library/std/src/sync/mpsc/cache_aligned.rs
deleted file mode 100644
index 9197f0d6e6c..00000000000
--- a/library/std/src/sync/mpsc/cache_aligned.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use crate::ops::{Deref, DerefMut};
-
-#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
-#[cfg_attr(target_arch = "aarch64", repr(align(128)))]
-#[cfg_attr(not(target_arch = "aarch64"), repr(align(64)))]
-pub(super) struct CacheAligned<T>(pub T);
-
-impl<T> Deref for CacheAligned<T> {
-    type Target = T;
-    fn deref(&self) -> &Self::Target {
-        &self.0
-    }
-}
-
-impl<T> DerefMut for CacheAligned<T> {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.0
-    }
-}
-
-impl<T> CacheAligned<T> {
-    pub(super) fn new(t: T) -> Self {
-        CacheAligned(t)
-    }
-}
diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs
index e85a8723965..27fba761ada 100644
--- a/library/std/src/sync/mpsc/mod.rs
+++ b/library/std/src/sync/mpsc/mod.rs
@@ -143,175 +143,16 @@ mod tests;
 #[cfg(all(test, not(target_os = "emscripten")))]
 mod sync_tests;
 
-// A description of how Rust's channel implementation works
-//
-// Channels are supposed to be the basic building block for all other
-// concurrent primitives that are used in Rust. As a result, the channel type
-// needs to be highly optimized, flexible, and broad enough for use everywhere.
-//
-// The choice of implementation of all channels is to be built on lock-free data
-// structures. The channels themselves are then consequently also lock-free data
-// structures. As always with lock-free code, this is a very "here be dragons"
-// territory, especially because I'm unaware of any academic papers that have
-// gone into great length about channels of these flavors.
-//
-// ## Flavors of channels
-//
-// From the perspective of a consumer of this library, there is only one flavor
-// of channel. This channel can be used as a stream and cloned to allow multiple
-// senders. Under the hood, however, there are actually three flavors of
-// channels in play.
-//
-// * Flavor::Oneshots - these channels are highly optimized for the one-send use
-//                      case. They contain as few atomics as possible and
-//                      involve one and exactly one allocation.
-// * Streams - these channels are optimized for the non-shared use case. They
-//             use a different concurrent queue that is more tailored for this
-//             use case. The initial allocation of this flavor of channel is not
-//             optimized.
-// * Shared - this is the most general form of channel that this module offers,
-//            a channel with multiple senders. This type is as optimized as it
-//            can be, but the previous two types mentioned are much faster for
-//            their use-cases.
-//
-// ## Concurrent queues
-//
-// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
-// but recv() obviously blocks. This means that under the hood there must be
-// some shared and concurrent queue holding all of the actual data.
-//
-// With two flavors of channels, two flavors of queues are also used. We have
-// chosen to use queues from a well-known author that are abbreviated as SPSC
-// and MPSC (single producer, single consumer and multiple producer, single
-// consumer). SPSC queues are used for streams while MPSC queues are used for
-// shared channels.
-//
-// ### SPSC optimizations
-//
-// The SPSC queue found online is essentially a linked list of nodes where one
-// half of the nodes are the "queue of data" and the other half of nodes are a
-// cache of unused nodes. The unused nodes are used such that an allocation is
-// not required on every push() and a free doesn't need to happen on every
-// pop().
-//
-// As found online, however, the cache of nodes is of an infinite size. This
-// means that if a channel at one point in its life had 50k items in the queue,
-// then the queue will always have the capacity for 50k items. I believed that
-// this was an unnecessary limitation of the implementation, so I have altered
-// the queue to optionally have a bound on the cache size.
-//
-// By default, streams will have an unbounded SPSC queue with a small-ish cache
-// size. The hope is that the cache is still large enough to have very fast
-// send() operations while not too large such that millions of channels can
-// coexist at once.
-//
-// ### MPSC optimizations
-//
-// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
-// a linked list under the hood to earn its unboundedness, but I have not put
-// forth much effort into having a cache of nodes similar to the SPSC queue.
-//
-// For now, I believe that this is "ok" because shared channels are not the most
-// common type, but soon we may wish to revisit this queue choice and determine
-// another candidate for backend storage of shared channels.
-//
-// ## Overview of the Implementation
-//
-// Now that there's a little background on the concurrent queues used, it's
-// worth going into much more detail about the channels themselves. The basic
-// pseudocode for a send/recv are:
-//
-//
-//      send(t)                             recv()
-//        queue.push(t)                       return if queue.pop()
-//        if increment() == -1                deschedule {
-//          wakeup()                            if decrement() > 0
-//                                                cancel_deschedule()
-//                                            }
-//                                            queue.pop()
-//
-// As mentioned before, there are no locks in this implementation, only atomic
-// instructions are used.
-//
-// ### The internal atomic counter
-//
-// Every channel has a shared counter with each half to keep track of the size
-// of the queue. This counter is used to abort descheduling by the receiver and
-// to know when to wake up on the sending side.
-//
-// As seen in the pseudocode, senders will increment this count and receivers
-// will decrement the count. The theory behind this is that if a sender sees a
-// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
-// then it doesn't need to block.
-//
-// The recv() method has a beginning call to pop(), and if successful, it needs
-// to decrement the count. It is a crucial implementation detail that this
-// decrement does *not* happen to the shared counter. If this were the case,
-// then it would be possible for the counter to be very negative when there were
-// no receivers waiting, in which case the senders would have to determine when
-// it was actually appropriate to wake up a receiver.
-//
-// Instead, the "steal count" is kept track of separately (not atomically
-// because it's only used by receivers), and then the decrement() call when
-// descheduling will lump in all of the recent steals into one large decrement.
-//
-// The implication of this is that if a sender sees a -1 count, then there's
-// guaranteed to be a waiter waiting!
-//
-// ## Native Implementation
-//
-// A major goal of these channels is to work seamlessly on and off the runtime.
-// All of the previous race conditions have been worded in terms of
-// scheduler-isms (which is obviously not available without the runtime).
-//
-// For now, native usage of channels (off the runtime) will fall back onto
-// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
-// is still entirely lock-free, the "deschedule" blocks above are surrounded by
-// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
-// condition variable.
-//
-// ## Select
-//
-// Being able to support selection over channels has greatly influenced this
-// design, and not only does selection need to work inside the runtime, but also
-// outside the runtime.
-//
-// The implementation is fairly straightforward. The goal of select() is not to
-// return some data, but only to return which channel can receive data without
-// blocking. The implementation is essentially the entire blocking procedure
-// followed by an increment as soon as its woken up. The cancellation procedure
-// involves an increment and swapping out of to_wake to acquire ownership of the
-// thread to unblock.
-//
-// Sadly this current implementation requires multiple allocations, so I have
-// seen the throughput of select() be much worse than it should be. I do not
-// believe that there is anything fundamental that needs to change about these
-// channels, however, in order to support a more efficient select().
-//
-// FIXME: Select is now removed, so these factors are ready to be cleaned up!
-//
-// # Conclusion
-//
-// And now that you've seen all the races that I found and attempted to fix,
-// here's the code for you to find some more!
-
-use crate::cell::UnsafeCell;
+// MPSC channels are built as a wrapper around MPMC channels, which
+// were ported from the `crossbeam-channel` crate. MPMC channels are
+// not exposed publicly, but if you are curious about the implementation,
+// that's where everything is.
+
 use crate::error;
 use crate::fmt;
-use crate::mem;
-use crate::sync::Arc;
+use crate::sync::mpmc;
 use crate::time::{Duration, Instant};
 
-mod blocking;
-mod mpsc_queue;
-mod oneshot;
-mod shared;
-mod spsc_queue;
-mod stream;
-mod sync;
-
-mod cache_aligned;
-
 /// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
 /// This half can only be owned by one thread.
 ///
@@ -341,7 +182,7 @@ mod cache_aligned;
 #[stable(feature = "rust1", since = "1.0.0")]
 #[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
 pub struct Receiver<T> {
-    inner: UnsafeCell<Flavor<T>>,
+    inner: mpmc::Receiver<T>,
 }
 
 // The receiver port can be sent from place to place, so long as it
@@ -498,7 +339,7 @@ pub struct IntoIter<T> {
 /// ```
 #[stable(feature = "rust1", since = "1.0.0")]
 pub struct Sender<T> {
-    inner: UnsafeCell<Flavor<T>>,
+    inner: mpmc::Sender<T>,
 }
 
 // The send port can be sent from place to place, so long as it
@@ -557,7 +398,7 @@ impl<T> !Sync for Sender<T> {}
 /// ```
 #[stable(feature = "rust1", since = "1.0.0")]
 pub struct SyncSender<T> {
-    inner: Arc<sync::Packet<T>>,
+    inner: mpmc::Sender<T>,
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
@@ -643,34 +484,6 @@ pub enum TrySendError<T> {
     Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
 }
 
-enum Flavor<T> {
-    Oneshot(Arc<oneshot::Packet<T>>),
-    Stream(Arc<stream::Packet<T>>),
-    Shared(Arc<shared::Packet<T>>),
-    Sync(Arc<sync::Packet<T>>),
-}
-
-#[doc(hidden)]
-trait UnsafeFlavor<T> {
-    fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
-    unsafe fn inner_mut(&self) -> &mut Flavor<T> {
-        &mut *self.inner_unsafe().get()
-    }
-    unsafe fn inner(&self) -> &Flavor<T> {
-        &*self.inner_unsafe().get()
-    }
-}
-impl<T> UnsafeFlavor<T> for Sender<T> {
-    fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
-        &self.inner
-    }
-}
-impl<T> UnsafeFlavor<T> for Receiver<T> {
-    fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
-        &self.inner
-    }
-}
-
 /// Creates a new asynchronous channel, returning the sender/receiver halves.
 /// All data sent on the [`Sender`] will become available on the [`Receiver`] in
 /// the same order as it was sent, and no [`send`] will block the calling thread
@@ -711,8 +524,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
 #[must_use]
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
-    let a = Arc::new(oneshot::Packet::new());
-    (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
+    let (tx, rx) = mpmc::channel();
+    (Sender { inner: tx }, Receiver { inner: rx })
 }
 
 /// Creates a new synchronous, bounded channel.
@@ -760,8 +573,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
 #[must_use]
 #[stable(feature = "rust1", since = "1.0.0")]
 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
-    let a = Arc::new(sync::Packet::new(bound));
-    (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
+    let (tx, rx) = mpmc::sync_channel(bound);
+    (SyncSender { inner: tx }, Receiver { inner: rx })
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -769,10 +582,6 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T> Sender<T> {
-    fn new(inner: Flavor<T>) -> Sender<T> {
-        Sender { inner: UnsafeCell::new(inner) }
-    }
-
     /// Attempts to send a value on this channel, returning it back if it could
     /// not be sent.
     ///
@@ -802,40 +611,7 @@ impl<T> Sender<T> {
     /// ```
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
-        let (new_inner, ret) = match *unsafe { self.inner() } {
-            Flavor::Oneshot(ref p) => {
-                if !p.sent() {
-                    return p.send(t).map_err(SendError);
-                } else {
-                    let a = Arc::new(stream::Packet::new());
-                    let rx = Receiver::new(Flavor::Stream(a.clone()));
-                    match p.upgrade(rx) {
-                        oneshot::UpSuccess => {
-                            let ret = a.send(t);
-                            (a, ret)
-                        }
-                        oneshot::UpDisconnected => (a, Err(t)),
-                        oneshot::UpWoke(token) => {
-                            // This send cannot panic because the thread is
-                            // asleep (we're looking at it), so the receiver
-                            // can't go away.
-                            a.send(t).ok().unwrap();
-                            token.signal();
-                            (a, Ok(()))
-                        }
-                    }
-                }
-            }
-            Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
-            Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
-            Flavor::Sync(..) => unreachable!(),
-        };
-
-        unsafe {
-            let tmp = Sender::new(Flavor::Stream(new_inner));
-            mem::swap(self.inner_mut(), tmp.inner_mut());
-        }
-        ret.map_err(SendError)
+        self.inner.send(t)
     }
 }
 
@@ -847,57 +623,14 @@ impl<T> Clone for Sender<T> {
     /// (including the original) need to be dropped in order for
     /// [`Receiver::recv`] to stop blocking.
     fn clone(&self) -> Sender<T> {
-        let packet = match *unsafe { self.inner() } {
-            Flavor::Oneshot(ref p) => {
-                let a = Arc::new(shared::Packet::new());
-                {
-                    let guard = a.postinit_lock();
-                    let rx = Receiver::new(Flavor::Shared(a.clone()));
-                    let sleeper = match p.upgrade(rx) {
-                        oneshot::UpSuccess | oneshot::UpDisconnected => None,
-                        oneshot::UpWoke(task) => Some(task),
-                    };
-                    a.inherit_blocker(sleeper, guard);
-                }
-                a
-            }
-            Flavor::Stream(ref p) => {
-                let a = Arc::new(shared::Packet::new());
-                {
-                    let guard = a.postinit_lock();
-                    let rx = Receiver::new(Flavor::Shared(a.clone()));
-                    let sleeper = match p.upgrade(rx) {
-                        stream::UpSuccess | stream::UpDisconnected => None,
-                        stream::UpWoke(task) => Some(task),
-                    };
-                    a.inherit_blocker(sleeper, guard);
-                }
-                a
-            }
-            Flavor::Shared(ref p) => {
-                p.clone_chan();
-                return Sender::new(Flavor::Shared(p.clone()));
-            }
-            Flavor::Sync(..) => unreachable!(),
-        };
-
-        unsafe {
-            let tmp = Sender::new(Flavor::Shared(packet.clone()));
-            mem::swap(self.inner_mut(), tmp.inner_mut());
-        }
-        Sender::new(Flavor::Shared(packet))
+        Sender { inner: self.inner.clone() }
     }
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for Sender<T> {
     fn drop(&mut self) {
-        match *unsafe { self.inner() } {
-            Flavor::Oneshot(ref p) => p.drop_chan(),
-            Flavor::Stream(ref p) => p.drop_chan(),
-            Flavor::Shared(ref p) => p.drop_chan(),
-            Flavor::Sync(..) => unreachable!(),
-        }
+        let _ = self.inner;
     }
 }
 
@@ -913,10 +646,6 @@ impl<T> fmt::Debug for Sender<T> {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T> SyncSender<T> {
-    fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
-        SyncSender { inner }
-    }
-
     /// Sends a value on this synchronous channel.
     ///
     /// This function will *block* until space in the internal buffer becomes
@@ -955,7 +684,7 @@ impl<T> SyncSender<T> {
     /// ```
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
-        self.inner.send(t).map_err(SendError)
+        self.inner.send(t)
     }
 
     /// Attempts to send a value on this channel without blocking.
@@ -1016,15 +745,14 @@ impl<T> SyncSender<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Clone for SyncSender<T> {
     fn clone(&self) -> SyncSender<T> {
-        self.inner.clone_chan();
-        SyncSender::new(self.inner.clone())
+        SyncSender { inner: self.inner.clone() }
     }
 }
 
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for SyncSender<T> {
     fn drop(&mut self) {
-        self.inner.drop_chan();
+        let _ = self.inner;
     }
 }
 
@@ -1040,10 +768,6 @@ impl<T> fmt::Debug for SyncSender<T> {
 ////////////////////////////////////////////////////////////////////////////////
 
 impl<T> Receiver<T> {
-    fn new(inner: Flavor<T>) -> Receiver<T> {
-        Receiver { inner: UnsafeCell::new(inner) }
-    }
-
     /// Attempts to return a pending value on this receiver without blocking.
     ///
     /// This method will never block the caller in order to wait for data to
@@ -1069,35 +793,7 @@ impl<T> Receiver<T> {
     /// ```
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn try_recv(&self) -> Result<T, TryRecvError> {
-        loop {
-            let new_port = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => match p.try_recv() {
-                    Ok(t) => return Ok(t),
-                    Err(oneshot::Empty) => return Err(TryRecvError::Empty),
-                    Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
-                    Err(oneshot::Upgraded(rx)) => rx,
-                },
-                Flavor::Stream(ref p) => match p.try_recv() {
-                    Ok(t) => return Ok(t),
-                    Err(stream::Empty) => return Err(TryRecvError::Empty),
-                    Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
-                    Err(stream::Upgraded(rx)) => rx,
-                },
-                Flavor::Shared(ref p) => match p.try_recv() {
-                    Ok(t) => return Ok(t),
-                    Err(shared::Empty) => return Err(TryRecvError::Empty),
-                    Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
-                },
-                Flavor::Sync(ref p) => match p.try_recv() {
-                    Ok(t) => return Ok(t),
-                    Err(sync::Empty) => return Err(TryRecvError::Empty),
-                    Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
-                },
-            };
-            unsafe {
-                mem::swap(self.inner_mut(), new_port.inner_mut());
-            }
-        }
+        self.inner.try_recv()
     }
 
     /// Attempts to wait for a value on this receiver, returning an error if the
@@ -1156,31 +852,7 @@ impl<T> Receiver<T> {
     /// ```
     #[stable(feature = "rust1", since = "1.0.0")]
     pub fn recv(&self) -> Result<T, RecvError> {
-        loop {
-            let new_port = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => match p.recv(None) {
-                    Ok(t) => return Ok(t),
-                    Err(oneshot::Disconnected) => return Err(RecvError),
-                    Err(oneshot::Upgraded(rx)) => rx,
-                    Err(oneshot::Empty) => unreachable!(),
-                },
-                Flavor::Stream(ref p) => match p.recv(None) {
-                    Ok(t) => return Ok(t),
-                    Err(stream::Disconnected) => return Err(RecvError),
-                    Err(stream::Upgraded(rx)) => rx,
-                    Err(stream::Empty) => unreachable!(),
-                },
-                Flavor::Shared(ref p) => match p.recv(None) {
-                    Ok(t) => return Ok(t),
-                    Err(shared::Disconnected) => return Err(RecvError),
-                    Err(shared::Empty) => unreachable!(),
-                },
-                Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
-            };
-            unsafe {
-                mem::swap(self.inner_mut(), new_port.inner_mut());
-            }
-        }
+        self.inner.recv()
     }
 
     /// Attempts to wait for a value on this receiver, returning an error if the
@@ -1198,34 +870,6 @@ impl<T> Receiver<T> {
     /// However, since channels are buffered, messages sent before the disconnect
     /// will still be properly received.
     ///
-    /// # Known Issues
-    ///
-    /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
-    /// to panic unexpectedly with the following example:
-    ///
-    /// ```no_run
-    /// use std::sync::mpsc::channel;
-    /// use std::thread;
-    /// use std::time::Duration;
-    ///
-    /// let (tx, rx) = channel::<String>();
-    ///
-    /// thread::spawn(move || {
-    ///     let d = Duration::from_millis(10);
-    ///     loop {
-    ///         println!("recv");
-    ///         let _r = rx.recv_timeout(d);
-    ///     }
-    /// });
-    ///
-    /// thread::sleep(Duration::from_millis(100));
-    /// let _c1 = tx.clone();
-    ///
-    /// thread::sleep(Duration::from_secs(1));
-    /// ```
-    ///
-    /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
-    ///
     /// # Examples
     ///
     /// Successfully receiving value before encountering timeout:
@@ -1268,17 +912,7 @@ impl<T> Receiver<T> {
     /// ```
     #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
-        // Do an optimistic try_recv to avoid the performance impact of
-        // Instant::now() in the full-channel case.
-        match self.try_recv() {
-            Ok(result) => Ok(result),
-            Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
-            Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
-                Some(deadline) => self.recv_deadline(deadline),
-                // So far in the future that it's practically the same as waiting indefinitely.
-                None => self.recv().map_err(RecvTimeoutError::from),
-            },
-        }
+        self.inner.recv_timeout(timeout)
     }
 
     /// Attempts to wait for a value on this receiver, returning an error if the
@@ -1339,46 +973,7 @@ impl<T> Receiver<T> {
     /// ```
     #[unstable(feature = "deadline_api", issue = "46316")]
     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
-        use self::RecvTimeoutError::*;
-
-        loop {
-            let port_or_empty = match *unsafe { self.inner() } {
-                Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
-                    Ok(t) => return Ok(t),
-                    Err(oneshot::Disconnected) => return Err(Disconnected),
-                    Err(oneshot::Upgraded(rx)) => Some(rx),
-                    Err(oneshot::Empty) => None,
-                },
-                Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
-                    Ok(t) => return Ok(t),
-                    Err(stream::Disconnected) => return Err(Disconnected),
-                    Err(stream::Upgraded(rx)) => Some(rx),
-                    Err(stream::Empty) => None,
-                },
-                Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
-                    Ok(t) => return Ok(t),
-                    Err(shared::Disconnected) => return Err(Disconnected),
-                    Err(shared::Empty) => None,
-                },
-                Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
-                    Ok(t) => return Ok(t),
-                    Err(sync::Disconnected) => return Err(Disconnected),
-                    Err(sync::Empty) => None,
-                },
-            };
-
-            if let Some(new_port) = port_or_empty {
-                unsafe {
-                    mem::swap(self.inner_mut(), new_port.inner_mut());
-                }
-            }
-
-            // If we're already passed the deadline, and we're here without
-            // data, return a timeout, else try again.
-            if Instant::now() >= deadline {
-                return Err(Timeout);
-            }
-        }
+        self.inner.recv_deadline(deadline)
     }
 
     /// Returns an iterator that will block waiting for messages, but never
@@ -1500,12 +1095,7 @@ impl<T> IntoIterator for Receiver<T> {
 #[stable(feature = "rust1", since = "1.0.0")]
 impl<T> Drop for Receiver<T> {
     fn drop(&mut self) {
-        match *unsafe { self.inner() } {
-            Flavor::Oneshot(ref p) => p.drop_port(),
-            Flavor::Stream(ref p) => p.drop_port(),
-            Flavor::Shared(ref p) => p.drop_port(),
-            Flavor::Sync(ref p) => p.drop_port(),
-        }
+        let _ = self.inner;
     }
 }
 
diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs
deleted file mode 100644
index 7322512e3b4..00000000000
--- a/library/std/src/sync/mpsc/mpsc_queue.rs
+++ /dev/null
@@ -1,124 +0,0 @@
-//! A mostly lock-free multi-producer, single consumer queue.
-//!
-//! This module contains an implementation of a concurrent MPSC queue. This
-//! queue can be used to share data between threads, and is also used as the
-//! building block of channels in rust.
-//!
-//! Note that the current implementation of this queue has a caveat of the `pop`
-//! method, and see the method for more information about it. Due to this
-//! caveat, this queue might not be appropriate for all use-cases.
-
-// The original implementation is based off:
-// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
-//
-// Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
-// http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-//
-// The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
-// of today the license of this file is the same as the rest of the codebase:
-// https://github.com/rust-lang/rust/pull/42149
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests;
-
-pub use self::PopResult::*;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-
-/// A result of the `pop` function.
-pub enum PopResult<T> {
-    /// Some data has been popped
-    Data(T),
-    /// The queue is empty
-    Empty,
-    /// The queue is in an inconsistent state. Popping data should succeed, but
-    /// some pushers have yet to make enough progress in order allow a pop to
-    /// succeed. It is recommended that a pop() occur "in the near future" in
-    /// order to see if the sender has made progress or not
-    Inconsistent,
-}
-
-struct Node<T> {
-    next: AtomicPtr<Node<T>>,
-    value: Option<T>,
-}
-
-/// The multi-producer single-consumer structure. This is not cloneable, but it
-/// may be safely shared so long as it is guaranteed that there is only one
-/// popper at a time (many pushers are allowed).
-pub struct Queue<T> {
-    head: AtomicPtr<Node<T>>,
-    tail: UnsafeCell<*mut Node<T>>,
-}
-
-unsafe impl<T: Send> Send for Queue<T> {}
-unsafe impl<T: Send> Sync for Queue<T> {}
-
-impl<T> Node<T> {
-    unsafe fn new(v: Option<T>) -> *mut Node<T> {
-        Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
-    }
-}
-
-impl<T> Queue<T> {
-    /// Creates a new queue that is safe to share among multiple producers and
-    /// one consumer.
-    pub fn new() -> Queue<T> {
-        let stub = unsafe { Node::new(None) };
-        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
-    }
-
-    /// Pushes a new value onto this queue.
-    pub fn push(&self, t: T) {
-        unsafe {
-            let n = Node::new(Some(t));
-            let prev = self.head.swap(n, Ordering::AcqRel);
-            (*prev).next.store(n, Ordering::Release);
-        }
-    }
-
-    /// Pops some data from this queue.
-    ///
-    /// Note that the current implementation means that this function cannot
-    /// return `Option<T>`. It is possible for this queue to be in an
-    /// inconsistent state where many pushes have succeeded and completely
-    /// finished, but pops cannot return `Some(t)`. This inconsistent state
-    /// happens when a pusher is pre-empted at an inopportune moment.
-    ///
-    /// This inconsistent state means that this queue does indeed have data, but
-    /// it does not currently have access to it at this time.
-    pub fn pop(&self) -> PopResult<T> {
-        unsafe {
-            let tail = *self.tail.get();
-            let next = (*tail).next.load(Ordering::Acquire);
-
-            if !next.is_null() {
-                *self.tail.get() = next;
-                assert!((*tail).value.is_none());
-                assert!((*next).value.is_some());
-                let ret = (*next).value.take().unwrap();
-                let _: Box<Node<T>> = Box::from_raw(tail);
-                return Data(ret);
-            }
-
-            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
-        }
-    }
-}
-
-impl<T> Drop for Queue<T> {
-    fn drop(&mut self) {
-        unsafe {
-            let mut cur = *self.tail.get();
-            while !cur.is_null() {
-                let next = (*cur).next.load(Ordering::Relaxed);
-                let _: Box<Node<T>> = Box::from_raw(cur);
-                cur = next;
-            }
-        }
-    }
-}
diff --git a/library/std/src/sync/mpsc/mpsc_queue/tests.rs b/library/std/src/sync/mpsc/mpsc_queue/tests.rs
deleted file mode 100644
index 34b2a9a98ac..00000000000
--- a/library/std/src/sync/mpsc/mpsc_queue/tests.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-use super::{Data, Empty, Inconsistent, Queue};
-use crate::sync::mpsc::channel;
-use crate::sync::Arc;
-use crate::thread;
-
-#[test]
-fn test_full() {
-    let q: Queue<Box<_>> = Queue::new();
-    q.push(Box::new(1));
-    q.push(Box::new(2));
-}
-
-#[test]
-fn test() {
-    let nthreads = 8;
-    let nmsgs = if cfg!(miri) { 100 } else { 1000 };
-    let q = Queue::new();
-    match q.pop() {
-        Empty => {}
-        Inconsistent | Data(..) => panic!(),
-    }
-    let (tx, rx) = channel();
-    let q = Arc::new(q);
-
-    for _ in 0..nthreads {
-        let tx = tx.clone();
-        let q = q.clone();
-        thread::spawn(move || {
-            for i in 0..nmsgs {
-                q.push(i);
-            }
-            tx.send(()).unwrap();
-        });
-    }
-
-    let mut i = 0;
-    while i < nthreads * nmsgs {
-        match q.pop() {
-            Empty | Inconsistent => {}
-            Data(_) => i += 1,
-        }
-    }
-    drop(tx);
-    for _ in 0..nthreads {
-        rx.recv().unwrap();
-    }
-}
diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs
deleted file mode 100644
index 0e259b8aecb..00000000000
--- a/library/std/src/sync/mpsc/oneshot.rs
+++ /dev/null
@@ -1,315 +0,0 @@
-/// Oneshot channels/ports
-///
-/// This is the initial flavor of channels/ports used for comm module. This is
-/// an optimization for the one-use case of a channel. The major optimization of
-/// this type is to have one and exactly one allocation when the chan/port pair
-/// is created.
-///
-/// Another possible optimization would be to not use an Arc box because
-/// in theory we know when the shared packet can be deallocated (no real need
-/// for the atomic reference counting), but I was having trouble how to destroy
-/// the data early in a drop of a Port.
-///
-/// # Implementation
-///
-/// Oneshots are implemented around one atomic usize variable. This variable
-/// indicates both the state of the port/chan but also contains any threads
-/// blocked on the port. All atomic operations happen on this one word.
-///
-/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
-/// on behalf of the channel side of things (it can be mentally thought of as
-/// consuming the port). This upgrade is then also stored in the shared packet.
-/// The one caveat to consider is that when a port sees a disconnected channel
-/// it must check for data because there is no "data plus upgrade" state.
-pub use self::Failure::*;
-use self::MyUpgrade::*;
-pub use self::UpgradeResult::*;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::Receiver;
-use crate::time::Instant;
-
-// Various states you can find a port in.
-const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
-const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
-const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
-// Any other value represents a pointer to a SignalToken value. The
-// protocol ensures that when the state moves *to* a pointer,
-// ownership of the token is given to the packet, and when the state
-// moves *from* a pointer, ownership of the token is transferred to
-// whoever changed the state.
-
-pub struct Packet<T> {
-    // Internal state of the chan/port pair (stores the blocked thread as well)
-    state: AtomicPtr<u8>,
-    // One-shot data slot location
-    data: UnsafeCell<Option<T>>,
-    // when used for the second time, a oneshot channel must be upgraded, and
-    // this contains the slot for the upgrade
-    upgrade: UnsafeCell<MyUpgrade<T>>,
-}
-
-pub enum Failure<T> {
-    Empty,
-    Disconnected,
-    Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
-    UpSuccess,
-    UpDisconnected,
-    UpWoke(SignalToken),
-}
-
-enum MyUpgrade<T> {
-    NothingSent,
-    SendUsed,
-    GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
-    pub fn new() -> Packet<T> {
-        Packet {
-            data: UnsafeCell::new(None),
-            upgrade: UnsafeCell::new(NothingSent),
-            state: AtomicPtr::new(EMPTY),
-        }
-    }
-
-    pub fn send(&self, t: T) -> Result<(), T> {
-        unsafe {
-            // Sanity check
-            match *self.upgrade.get() {
-                NothingSent => {}
-                _ => panic!("sending on a oneshot that's already sent on "),
-            }
-            assert!((*self.data.get()).is_none());
-            ptr::write(self.data.get(), Some(t));
-            ptr::write(self.upgrade.get(), SendUsed);
-
-            match self.state.swap(DATA, Ordering::SeqCst) {
-                // Sent the data, no one was waiting
-                EMPTY => Ok(()),
-
-                // Couldn't send the data, the port hung up first. Return the data
-                // back up the stack.
-                DISCONNECTED => {
-                    self.state.swap(DISCONNECTED, Ordering::SeqCst);
-                    ptr::write(self.upgrade.get(), NothingSent);
-                    Err((&mut *self.data.get()).take().unwrap())
-                }
-
-                // Not possible, these are one-use channels
-                DATA => unreachable!(),
-
-                // There is a thread waiting on the other end. We leave the 'DATA'
-                // state inside so it'll pick it up on the other end.
-                ptr => {
-                    SignalToken::from_raw(ptr).signal();
-                    Ok(())
-                }
-            }
-        }
-    }
-
-    // Just tests whether this channel has been sent on or not, this is only
-    // safe to use from the sender.
-    pub fn sent(&self) -> bool {
-        unsafe { !matches!(*self.upgrade.get(), NothingSent) }
-    }
-
-    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
-        // Attempt to not block the thread (it's a little expensive). If it looks
-        // like we're not empty, then immediately go through to `try_recv`.
-        if self.state.load(Ordering::SeqCst) == EMPTY {
-            let (wait_token, signal_token) = blocking::tokens();
-            let ptr = unsafe { signal_token.to_raw() };
-
-            // race with senders to enter the blocking state
-            if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
-                if let Some(deadline) = deadline {
-                    let timed_out = !wait_token.wait_max_until(deadline);
-                    // Try to reset the state
-                    if timed_out {
-                        self.abort_selection().map_err(Upgraded)?;
-                    }
-                } else {
-                    wait_token.wait();
-                    debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
-                }
-            } else {
-                // drop the signal token, since we never blocked
-                drop(unsafe { SignalToken::from_raw(ptr) });
-            }
-        }
-
-        self.try_recv()
-    }
-
-    pub fn try_recv(&self) -> Result<T, Failure<T>> {
-        unsafe {
-            match self.state.load(Ordering::SeqCst) {
-                EMPTY => Err(Empty),
-
-                // We saw some data on the channel, but the channel can be used
-                // again to send us an upgrade. As a result, we need to re-insert
-                // into the channel that there's no data available (otherwise we'll
-                // just see DATA next time). This is done as a cmpxchg because if
-                // the state changes under our feet we'd rather just see that state
-                // change.
-                DATA => {
-                    let _ = self.state.compare_exchange(
-                        DATA,
-                        EMPTY,
-                        Ordering::SeqCst,
-                        Ordering::SeqCst,
-                    );
-                    match (&mut *self.data.get()).take() {
-                        Some(data) => Ok(data),
-                        None => unreachable!(),
-                    }
-                }
-
-                // There's no guarantee that we receive before an upgrade happens,
-                // and an upgrade flags the channel as disconnected, so when we see
-                // this we first need to check if there's data available and *then*
-                // we go through and process the upgrade.
-                DISCONNECTED => match (&mut *self.data.get()).take() {
-                    Some(data) => Ok(data),
-                    None => match ptr::replace(self.upgrade.get(), SendUsed) {
-                        SendUsed | NothingSent => Err(Disconnected),
-                        GoUp(upgrade) => Err(Upgraded(upgrade)),
-                    },
-                },
-
-                // We are the sole receiver; there cannot be a blocking
-                // receiver already.
-                _ => unreachable!(),
-            }
-        }
-    }
-
-    // Returns whether the upgrade was completed. If the upgrade wasn't
-    // completed, then the port couldn't get sent to the other half (it will
-    // never receive it).
-    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
-        unsafe {
-            let prev = match *self.upgrade.get() {
-                NothingSent => NothingSent,
-                SendUsed => SendUsed,
-                _ => panic!("upgrading again"),
-            };
-            ptr::write(self.upgrade.get(), GoUp(up));
-
-            match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
-                // If the channel is empty or has data on it, then we're good to go.
-                // Senders will check the data before the upgrade (in case we
-                // plastered over the DATA state).
-                DATA | EMPTY => UpSuccess,
-
-                // If the other end is already disconnected, then we failed the
-                // upgrade. Be sure to trash the port we were given.
-                DISCONNECTED => {
-                    ptr::replace(self.upgrade.get(), prev);
-                    UpDisconnected
-                }
-
-                // If someone's waiting, we gotta wake them up
-                ptr => UpWoke(SignalToken::from_raw(ptr)),
-            }
-        }
-    }
-
-    pub fn drop_chan(&self) {
-        match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
-            DATA | DISCONNECTED | EMPTY => {}
-
-            // If someone's waiting, we gotta wake them up
-            ptr => unsafe {
-                SignalToken::from_raw(ptr).signal();
-            },
-        }
-    }
-
-    pub fn drop_port(&self) {
-        match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
-            // An empty channel has nothing to do, and a remotely disconnected
-            // channel also has nothing to do b/c we're about to run the drop
-            // glue
-            DISCONNECTED | EMPTY => {}
-
-            // There's data on the channel, so make sure we destroy it promptly.
-            // This is why not using an arc is a little difficult (need the box
-            // to stay valid while we take the data).
-            DATA => unsafe {
-                (&mut *self.data.get()).take().unwrap();
-            },
-
-            // We're the only ones that can block on this port
-            _ => unreachable!(),
-        }
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // Remove a previous selecting thread from this port. This ensures that the
-    // blocked thread will no longer be visible to any other threads.
-    //
-    // The return value indicates whether there's data on this port.
-    pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
-        let state = match self.state.load(Ordering::SeqCst) {
-            // Each of these states means that no further activity will happen
-            // with regard to abortion selection
-            s @ (EMPTY | DATA | DISCONNECTED) => s,
-
-            // If we've got a blocked thread, then use an atomic to gain ownership
-            // of it (may fail)
-            ptr => self
-                .state
-                .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
-                .unwrap_or_else(|x| x),
-        };
-
-        // Now that we've got ownership of our state, figure out what to do
-        // about it.
-        match state {
-            EMPTY => unreachable!(),
-            // our thread used for select was stolen
-            DATA => Ok(true),
-
-            // If the other end has hung up, then we have complete ownership
-            // of the port. First, check if there was data waiting for us. This
-            // is possible if the other end sent something and then hung up.
-            //
-            // We then need to check to see if there was an upgrade requested,
-            // and if so, the upgraded port needs to have its selection aborted.
-            DISCONNECTED => unsafe {
-                if (*self.data.get()).is_some() {
-                    Ok(true)
-                } else {
-                    match ptr::replace(self.upgrade.get(), SendUsed) {
-                        GoUp(port) => Err(port),
-                        _ => Ok(true),
-                    }
-                }
-            },
-
-            // We woke ourselves up from select.
-            ptr => unsafe {
-                drop(SignalToken::from_raw(ptr));
-                Ok(false)
-            },
-        }
-    }
-}
-
-impl<T> Drop for Packet<T> {
-    fn drop(&mut self) {
-        assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
-    }
-}
diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs
deleted file mode 100644
index 51917bd96bd..00000000000
--- a/library/std/src/sync/mpsc/shared.rs
+++ /dev/null
@@ -1,501 +0,0 @@
-/// Shared channels.
-///
-/// This is the flavor of channels which are not necessarily optimized for any
-/// particular use case, but are the most general in how they are used. Shared
-/// channels are cloneable allowing for multiple senders.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module. You'll also note that the implementation of the shared and stream
-/// channels are quite similar, and this is no coincidence!
-pub use self::Failure::*;
-use self::StartResult::*;
-
-use core::cmp;
-use core::intrinsics::abort;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::mpsc_queue as mpsc;
-use crate::sync::{Mutex, MutexGuard};
-use crate::thread;
-use crate::time::Instant;
-
-const DISCONNECTED: isize = isize::MIN;
-const FUDGE: isize = 1024;
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
-
-pub struct Packet<T> {
-    queue: mpsc::Queue<T>,
-    cnt: AtomicIsize,          // How many items are on this channel
-    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-    to_wake: AtomicPtr<u8>,    // SignalToken for wake up
-
-    // The number of channels which are currently using this packet.
-    channels: AtomicUsize,
-
-    // See the discussion in Port::drop and the channel send methods for what
-    // these are used for
-    port_dropped: AtomicBool,
-    sender_drain: AtomicIsize,
-
-    // this lock protects various portions of this implementation during
-    // select()
-    select_lock: Mutex<()>,
-}
-
-pub enum Failure {
-    Empty,
-    Disconnected,
-}
-
-#[derive(PartialEq, Eq)]
-enum StartResult {
-    Installed,
-    Abort,
-}
-
-impl<T> Packet<T> {
-    // Creation of a packet *must* be followed by a call to postinit_lock
-    // and later by inherit_blocker
-    pub fn new() -> Packet<T> {
-        Packet {
-            queue: mpsc::Queue::new(),
-            cnt: AtomicIsize::new(0),
-            steals: UnsafeCell::new(0),
-            to_wake: AtomicPtr::new(EMPTY),
-            channels: AtomicUsize::new(2),
-            port_dropped: AtomicBool::new(false),
-            sender_drain: AtomicIsize::new(0),
-            select_lock: Mutex::new(()),
-        }
-    }
-
-    // This function should be used after newly created Packet
-    // was wrapped with an Arc
-    // In other case mutex data will be duplicated while cloning
-    // and that could cause problems on platforms where it is
-    // represented by opaque data structure
-    pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
-        self.select_lock.lock().unwrap()
-    }
-
-    // This function is used at the creation of a shared packet to inherit a
-    // previously blocked thread. This is done to prevent spurious wakeups of
-    // threads in select().
-    //
-    // This can only be called at channel-creation time
-    pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
-        if let Some(token) = token {
-            assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
-            self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
-            self.cnt.store(-1, Ordering::SeqCst);
-
-            // This store is a little sketchy. What's happening here is that
-            // we're transferring a blocker from a oneshot or stream channel to
-            // this shared channel. In doing so, we never spuriously wake them
-            // up and rather only wake them up at the appropriate time. This
-            // implementation of shared channels assumes that any blocking
-            // recv() will undo the increment of steals performed in try_recv()
-            // once the recv is complete.  This thread that we're inheriting,
-            // however, is not in the middle of recv. Hence, the first time we
-            // wake them up, they're going to wake up from their old port, move
-            // on to the upgraded port, and then call the block recv() function.
-            //
-            // When calling this function, they'll find there's data immediately
-            // available, counting it as a steal. This in fact wasn't a steal
-            // because we appropriately blocked them waiting for data.
-            //
-            // To offset this bad increment, we initially set the steal count to
-            // -1. You'll find some special code in abort_selection() as well to
-            // ensure that this -1 steal count doesn't escape too far.
-            unsafe {
-                *self.steals.get() = -1;
-            }
-        }
-
-        // When the shared packet is constructed, we grabbed this lock. The
-        // purpose of this lock is to ensure that abort_selection() doesn't
-        // interfere with this method. After we unlock this lock, we're
-        // signifying that we're done modifying self.cnt and self.to_wake and
-        // the port is ready for the world to continue using it.
-        drop(guard);
-    }
-
-    pub fn send(&self, t: T) -> Result<(), T> {
-        // See Port::drop for what's going on
-        if self.port_dropped.load(Ordering::SeqCst) {
-            return Err(t);
-        }
-
-        // Note that the multiple sender case is a little trickier
-        // semantically than the single sender case. The logic for
-        // incrementing is "add and if disconnected store disconnected".
-        // This could end up leading some senders to believe that there
-        // wasn't a disconnect if in fact there was a disconnect. This means
-        // that while one thread is attempting to re-store the disconnected
-        // states, other threads could walk through merrily incrementing
-        // this very-negative disconnected count. To prevent senders from
-        // spuriously attempting to send when the channels is actually
-        // disconnected, the count has a ranged check here.
-        //
-        // This is also done for another reason. Remember that the return
-        // value of this function is:
-        //
-        //  `true` == the data *may* be received, this essentially has no
-        //            meaning
-        //  `false` == the data will *never* be received, this has a lot of
-        //             meaning
-        //
-        // In the SPSC case, we have a check of 'queue.is_empty()' to see
-        // whether the data was actually received, but this same condition
-        // means nothing in a multi-producer context. As a result, this
-        // preflight check serves as the definitive "this will never be
-        // received". Once we get beyond this check, we have permanently
-        // entered the realm of "this may be received"
-        if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
-            return Err(t);
-        }
-
-        self.queue.push(t);
-        match self.cnt.fetch_add(1, Ordering::SeqCst) {
-            -1 => {
-                self.take_to_wake().signal();
-            }
-
-            // In this case, we have possibly failed to send our data, and
-            // we need to consider re-popping the data in order to fully
-            // destroy it. We must arbitrate among the multiple senders,
-            // however, because the queues that we're using are
-            // single-consumer queues. In order to do this, all exiting
-            // pushers will use an atomic count in order to count those
-            // flowing through. Pushers who see 0 are required to drain as
-            // much as possible, and then can only exit when they are the
-            // only pusher (otherwise they must try again).
-            n if n < DISCONNECTED + FUDGE => {
-                // see the comment in 'try' for a shared channel for why this
-                // window of "not disconnected" is ok.
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-
-                if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
-                    loop {
-                        // drain the queue, for info on the thread yield see the
-                        // discussion in try_recv
-                        loop {
-                            match self.queue.pop() {
-                                mpsc::Data(..) => {}
-                                mpsc::Empty => break,
-                                mpsc::Inconsistent => thread::yield_now(),
-                            }
-                        }
-                        // maybe we're done, if we're not the last ones
-                        // here, then we need to go try again.
-                        if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
-                            break;
-                        }
-                    }
-
-                    // At this point, there may still be data on the queue,
-                    // but only if the count hasn't been incremented and
-                    // some other sender hasn't finished pushing data just
-                    // yet. That sender in question will drain its own data.
-                }
-            }
-
-            // Can't make any assumptions about this case like in the SPSC case.
-            _ => {}
-        }
-
-        Ok(())
-    }
-
-    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
-        // This code is essentially the exact same as that found in the stream
-        // case (see stream.rs)
-        match self.try_recv() {
-            Err(Empty) => {}
-            data => return data,
-        }
-
-        let (wait_token, signal_token) = blocking::tokens();
-        if self.decrement(signal_token) == Installed {
-            if let Some(deadline) = deadline {
-                let timed_out = !wait_token.wait_max_until(deadline);
-                if timed_out {
-                    self.abort_selection(false);
-                }
-            } else {
-                wait_token.wait();
-            }
-        }
-
-        match self.try_recv() {
-            data @ Ok(..) => unsafe {
-                *self.steals.get() -= 1;
-                data
-            },
-            data => data,
-        }
-    }
-
-    // Essentially the exact same thing as the stream decrement function.
-    // Returns true if blocking should proceed.
-    fn decrement(&self, token: SignalToken) -> StartResult {
-        unsafe {
-            assert_eq!(
-                self.to_wake.load(Ordering::SeqCst),
-                EMPTY,
-                "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
-            );
-            let ptr = token.to_raw();
-            self.to_wake.store(ptr, Ordering::SeqCst);
-
-            let steals = ptr::replace(self.steals.get(), 0);
-
-            match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-                DISCONNECTED => {
-                    self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-                }
-                // If we factor in our steals and notice that the channel has no
-                // data, we successfully sleep
-                n => {
-                    assert!(n >= 0);
-                    if n - steals <= 0 {
-                        return Installed;
-                    }
-                }
-            }
-
-            self.to_wake.store(EMPTY, Ordering::SeqCst);
-            drop(SignalToken::from_raw(ptr));
-            Abort
-        }
-    }
-
-    pub fn try_recv(&self) -> Result<T, Failure> {
-        let ret = match self.queue.pop() {
-            mpsc::Data(t) => Some(t),
-            mpsc::Empty => None,
-
-            // This is a bit of an interesting case. The channel is reported as
-            // having data available, but our pop() has failed due to the queue
-            // being in an inconsistent state.  This means that there is some
-            // pusher somewhere which has yet to complete, but we are guaranteed
-            // that a pop will eventually succeed. In this case, we spin in a
-            // yield loop because the remote sender should finish their enqueue
-            // operation "very quickly".
-            //
-            // Avoiding this yield loop would require a different queue
-            // abstraction which provides the guarantee that after M pushes have
-            // succeeded, at least M pops will succeed. The current queues
-            // guarantee that if there are N active pushes, you can pop N times
-            // once all N have finished.
-            mpsc::Inconsistent => {
-                let data;
-                loop {
-                    thread::yield_now();
-                    match self.queue.pop() {
-                        mpsc::Data(t) => {
-                            data = t;
-                            break;
-                        }
-                        mpsc::Empty => panic!("inconsistent => empty"),
-                        mpsc::Inconsistent => {}
-                    }
-                }
-                Some(data)
-            }
-        };
-        match ret {
-            // See the discussion in the stream implementation for why we
-            // might decrement steals.
-            Some(data) => unsafe {
-                if *self.steals.get() > MAX_STEALS {
-                    match self.cnt.swap(0, Ordering::SeqCst) {
-                        DISCONNECTED => {
-                            self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-                        }
-                        n => {
-                            let m = cmp::min(n, *self.steals.get());
-                            *self.steals.get() -= m;
-                            self.bump(n - m);
-                        }
-                    }
-                    assert!(*self.steals.get() >= 0);
-                }
-                *self.steals.get() += 1;
-                Ok(data)
-            },
-
-            // See the discussion in the stream implementation for why we try
-            // again.
-            None => {
-                match self.cnt.load(Ordering::SeqCst) {
-                    n if n != DISCONNECTED => Err(Empty),
-                    _ => {
-                        match self.queue.pop() {
-                            mpsc::Data(t) => Ok(t),
-                            mpsc::Empty => Err(Disconnected),
-                            // with no senders, an inconsistency is impossible.
-                            mpsc::Inconsistent => unreachable!(),
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    // Prepares this shared packet for a channel clone, essentially just bumping
-    // a refcount.
-    pub fn clone_chan(&self) {
-        let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
-        // See comments on Arc::clone() on why we do this (for `mem::forget`).
-        if old_count > MAX_REFCOUNT {
-            abort();
-        }
-    }
-
-    // Decrement the reference count on a channel. This is called whenever a
-    // Chan is dropped and may end up waking up a receiver. It's the receiver's
-    // responsibility on the other end to figure out that we've disconnected.
-    pub fn drop_chan(&self) {
-        match self.channels.fetch_sub(1, Ordering::SeqCst) {
-            1 => {}
-            n if n > 1 => return,
-            n => panic!("bad number of channels left {n}"),
-        }
-
-        match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-            -1 => {
-                self.take_to_wake().signal();
-            }
-            DISCONNECTED => {}
-            n => {
-                assert!(n >= 0);
-            }
-        }
-    }
-
-    // See the long discussion inside of stream.rs for why the queue is drained,
-    // and why it is done in this fashion.
-    pub fn drop_port(&self) {
-        self.port_dropped.store(true, Ordering::SeqCst);
-        let mut steals = unsafe { *self.steals.get() };
-        while {
-            match self.cnt.compare_exchange(
-                steals,
-                DISCONNECTED,
-                Ordering::SeqCst,
-                Ordering::SeqCst,
-            ) {
-                Ok(_) => false,
-                Err(old) => old != DISCONNECTED,
-            }
-        } {
-            // See the discussion in 'try_recv' for why we yield
-            // control of this thread.
-            loop {
-                match self.queue.pop() {
-                    mpsc::Data(..) => {
-                        steals += 1;
-                    }
-                    mpsc::Empty | mpsc::Inconsistent => break,
-                }
-            }
-        }
-    }
-
-    // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&self) -> SignalToken {
-        let ptr = self.to_wake.load(Ordering::SeqCst);
-        self.to_wake.store(EMPTY, Ordering::SeqCst);
-        assert!(ptr != EMPTY);
-        unsafe { SignalToken::from_raw(ptr) }
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // increment the count on the channel (used for selection)
-    fn bump(&self, amt: isize) -> isize {
-        match self.cnt.fetch_add(amt, Ordering::SeqCst) {
-            DISCONNECTED => {
-                self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-                DISCONNECTED
-            }
-            n => n,
-        }
-    }
-
-    // Cancels a previous thread waiting on this port, returning whether there's
-    // data on the port.
-    //
-    // This is similar to the stream implementation (hence fewer comments), but
-    // uses a different value for the "steals" variable.
-    pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
-        // Before we do anything else, we bounce on this lock. The reason for
-        // doing this is to ensure that any upgrade-in-progress is gone and
-        // done with. Without this bounce, we can race with inherit_blocker
-        // about looking at and dealing with to_wake. Once we have acquired the
-        // lock, we are guaranteed that inherit_blocker is done.
-        {
-            let _guard = self.select_lock.lock().unwrap();
-        }
-
-        // Like the stream implementation, we want to make sure that the count
-        // on the channel goes non-negative. We don't know how negative the
-        // stream currently is, so instead of using a steal value of 1, we load
-        // the channel count and figure out what we should do to make it
-        // positive.
-        let steals = {
-            let cnt = self.cnt.load(Ordering::SeqCst);
-            if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
-        };
-        let prev = self.bump(steals + 1);
-
-        if prev == DISCONNECTED {
-            assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
-            true
-        } else {
-            let cur = prev + steals + 1;
-            assert!(cur >= 0);
-            if prev < 0 {
-                drop(self.take_to_wake());
-            } else {
-                while self.to_wake.load(Ordering::SeqCst) != EMPTY {
-                    thread::yield_now();
-                }
-            }
-            unsafe {
-                // if the number of steals is -1, it was the pre-emptive -1 steal
-                // count from when we inherited a blocker. This is fine because
-                // we're just going to overwrite it with a real value.
-                let old = self.steals.get();
-                assert!(*old == 0 || *old == -1);
-                *old = steals;
-                prev >= 0
-            }
-        }
-    }
-}
-
-impl<T> Drop for Packet<T> {
-    fn drop(&mut self) {
-        // Note that this load is not only an assert for correctness about
-        // disconnection, but also a proper fence before the read of
-        // `to_wake`, so this assert cannot be removed with also removing
-        // the `to_wake` assert.
-        assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
-        assert_eq!(self.channels.load(Ordering::SeqCst), 0);
-    }
-}
diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs
deleted file mode 100644
index 61f91313ea9..00000000000
--- a/library/std/src/sync/mpsc/spsc_queue.rs
+++ /dev/null
@@ -1,244 +0,0 @@
-//! A single-producer single-consumer concurrent queue
-//!
-//! This module contains the implementation of an SPSC queue which can be used
-//! concurrently between two threads. This data structure is safe to use and
-//! enforces the semantics that there is one pusher and one popper.
-
-// The original implementation is based off:
-// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-//
-// Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
-// http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-//
-// The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
-// of today the license of this file is the same as the rest of the codebase:
-// https://github.com/rust-lang/rust/pull/42149
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
-
-use super::cache_aligned::CacheAligned;
-
-// Node within the linked list queue of messages to send
-struct Node<T> {
-    // FIXME: this could be an uninitialized T if we're careful enough, and
-    //      that would reduce memory usage (and be a bit faster).
-    //      is it worth it?
-    value: Option<T>,         // nullable for re-use of nodes
-    cached: bool,             // This node goes into the node cache
-    next: AtomicPtr<Node<T>>, // next node in the queue
-}
-
-/// The single-producer single-consumer queue. This structure is not cloneable,
-/// but it can be safely shared in an Arc if it is guaranteed that there
-/// is only one popper and one pusher touching the queue at any one point in
-/// time.
-pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
-    // consumer fields
-    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
-
-    // producer fields
-    producer: CacheAligned<Producer<T, ProducerAddition>>,
-}
-
-struct Consumer<T, Addition> {
-    tail: UnsafeCell<*mut Node<T>>, // where to pop from
-    tail_prev: AtomicPtr<Node<T>>,  // where to pop from
-    cache_bound: usize,             // maximum cache size
-    cached_nodes: AtomicUsize,      // number of nodes marked as cacheable
-    addition: Addition,
-}
-
-struct Producer<T, Addition> {
-    head: UnsafeCell<*mut Node<T>>,      // where to push to
-    first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
-    tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-    addition: Addition,
-}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
-
-impl<T> Node<T> {
-    fn new() -> *mut Node<T> {
-        Box::into_raw(box Node {
-            value: None,
-            cached: false,
-            next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
-        })
-    }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
-    /// Creates a new queue. With given additional elements in the producer and
-    /// consumer portions of the queue.
-    ///
-    /// Due to the performance implications of cache-contention,
-    /// we wish to keep fields used mainly by the producer on a separate cache
-    /// line than those used by the consumer.
-    /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
-    /// allocate one for small fields, so we allow users to insert additional
-    /// fields into the cache lines already allocated by this for the producer
-    /// and consumer.
-    ///
-    /// This is unsafe as the type system doesn't enforce a single
-    /// consumer-producer relationship. It also allows the consumer to `pop`
-    /// items while there is a `peek` active due to all methods having a
-    /// non-mutable receiver.
-    ///
-    /// # Arguments
-    ///
-    ///   * `bound` - This queue implementation is implemented with a linked
-    ///               list, and this means that a push is always a malloc. In
-    ///               order to amortize this cost, an internal cache of nodes is
-    ///               maintained to prevent a malloc from always being
-    ///               necessary. This bound is the limit on the size of the
-    ///               cache (if desired). If the value is 0, then the cache has
-    ///               no bound. Otherwise, the cache will never grow larger than
-    ///               `bound` (although the queue itself could be much larger.
-    pub unsafe fn with_additions(
-        bound: usize,
-        producer_addition: ProducerAddition,
-        consumer_addition: ConsumerAddition,
-    ) -> Self {
-        let n1 = Node::new();
-        let n2 = Node::new();
-        (*n1).next.store(n2, Ordering::Relaxed);
-        Queue {
-            consumer: CacheAligned::new(Consumer {
-                tail: UnsafeCell::new(n2),
-                tail_prev: AtomicPtr::new(n1),
-                cache_bound: bound,
-                cached_nodes: AtomicUsize::new(0),
-                addition: consumer_addition,
-            }),
-            producer: CacheAligned::new(Producer {
-                head: UnsafeCell::new(n2),
-                first: UnsafeCell::new(n1),
-                tail_copy: UnsafeCell::new(n1),
-                addition: producer_addition,
-            }),
-        }
-    }
-
-    /// Pushes a new value onto this queue. Note that to use this function
-    /// safely, it must be externally guaranteed that there is only one pusher.
-    pub fn push(&self, t: T) {
-        unsafe {
-            // Acquire a node (which either uses a cached one or allocates a new
-            // one), and then append this to the 'head' node.
-            let n = self.alloc();
-            assert!((*n).value.is_none());
-            (*n).value = Some(t);
-            (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
-            (**self.producer.head.get()).next.store(n, Ordering::Release);
-            *(&self.producer.head).get() = n;
-        }
-    }
-
-    unsafe fn alloc(&self) -> *mut Node<T> {
-        // First try to see if we can consume the 'first' node for our uses.
-        if *self.producer.first.get() != *self.producer.tail_copy.get() {
-            let ret = *self.producer.first.get();
-            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
-            return ret;
-        }
-        // If the above fails, then update our copy of the tail and try
-        // again.
-        *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
-        if *self.producer.first.get() != *self.producer.tail_copy.get() {
-            let ret = *self.producer.first.get();
-            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
-            return ret;
-        }
-        // If all of that fails, then we have to allocate a new node
-        // (there's nothing in the node cache).
-        Node::new()
-    }
-
-    /// Attempts to pop a value from this queue. Remember that to use this type
-    /// safely you must ensure that there is only one popper at a time.
-    pub fn pop(&self) -> Option<T> {
-        unsafe {
-            // The `tail` node is not actually a used node, but rather a
-            // sentinel from where we should start popping from. Hence, look at
-            // tail's next field and see if we can use it. If we do a pop, then
-            // the current tail node is a candidate for going into the cache.
-            let tail = *self.consumer.tail.get();
-            let next = (*tail).next.load(Ordering::Acquire);
-            if next.is_null() {
-                return None;
-            }
-            assert!((*next).value.is_some());
-            let ret = (*next).value.take();
-
-            *self.consumer.0.tail.get() = next;
-            if self.consumer.cache_bound == 0 {
-                self.consumer.tail_prev.store(tail, Ordering::Release);
-            } else {
-                let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
-                if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
-                    self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
-                    (*tail).cached = true;
-                }
-
-                if (*tail).cached {
-                    self.consumer.tail_prev.store(tail, Ordering::Release);
-                } else {
-                    (*self.consumer.tail_prev.load(Ordering::Relaxed))
-                        .next
-                        .store(next, Ordering::Relaxed);
-                    // We have successfully erased all references to 'tail', so
-                    // now we can safely drop it.
-                    let _: Box<Node<T>> = Box::from_raw(tail);
-                }
-            }
-            ret
-        }
-    }
-
-    /// Attempts to peek at the head of the queue, returning `None` if the queue
-    /// has no data currently
-    ///
-    /// # Warning
-    /// The reference returned is invalid if it is not used before the consumer
-    /// pops the value off the queue. If the producer then pushes another value
-    /// onto the queue, it will overwrite the value pointed to by the reference.
-    pub fn peek(&self) -> Option<&mut T> {
-        // This is essentially the same as above with all the popping bits
-        // stripped out.
-        unsafe {
-            let tail = *self.consumer.tail.get();
-            let next = (*tail).next.load(Ordering::Acquire);
-            if next.is_null() { None } else { (*next).value.as_mut() }
-        }
-    }
-
-    pub fn producer_addition(&self) -> &ProducerAddition {
-        &self.producer.addition
-    }
-
-    pub fn consumer_addition(&self) -> &ConsumerAddition {
-        &self.consumer.addition
-    }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
-    fn drop(&mut self) {
-        unsafe {
-            let mut cur = *self.producer.first.get();
-            while !cur.is_null() {
-                let next = (*cur).next.load(Ordering::Relaxed);
-                let _n: Box<Node<T>> = Box::from_raw(cur);
-                cur = next;
-            }
-        }
-    }
-}
diff --git a/library/std/src/sync/mpsc/spsc_queue/tests.rs b/library/std/src/sync/mpsc/spsc_queue/tests.rs
deleted file mode 100644
index eb6d5c2cf66..00000000000
--- a/library/std/src/sync/mpsc/spsc_queue/tests.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-use super::Queue;
-use crate::sync::mpsc::channel;
-use crate::sync::Arc;
-use crate::thread;
-
-#[test]
-fn smoke() {
-    unsafe {
-        let queue = Queue::with_additions(0, (), ());
-        queue.push(1);
-        queue.push(2);
-        assert_eq!(queue.pop(), Some(1));
-        assert_eq!(queue.pop(), Some(2));
-        assert_eq!(queue.pop(), None);
-        queue.push(3);
-        queue.push(4);
-        assert_eq!(queue.pop(), Some(3));
-        assert_eq!(queue.pop(), Some(4));
-        assert_eq!(queue.pop(), None);
-    }
-}
-
-#[test]
-fn peek() {
-    unsafe {
-        let queue = Queue::with_additions(0, (), ());
-        queue.push(vec![1]);
-
-        // Ensure the borrowchecker works
-        match queue.peek() {
-            Some(vec) => {
-                assert_eq!(&*vec, &[1]);
-            }
-            None => unreachable!(),
-        }
-
-        match queue.pop() {
-            Some(vec) => {
-                assert_eq!(&*vec, &[1]);
-            }
-            None => unreachable!(),
-        }
-    }
-}
-
-#[test]
-fn drop_full() {
-    unsafe {
-        let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
-        q.push(Box::new(1));
-        q.push(Box::new(2));
-    }
-}
-
-#[test]
-fn smoke_bound() {
-    unsafe {
-        let q = Queue::with_additions(0, (), ());
-        q.push(1);
-        q.push(2);
-        assert_eq!(q.pop(), Some(1));
-        assert_eq!(q.pop(), Some(2));
-        assert_eq!(q.pop(), None);
-        q.push(3);
-        q.push(4);
-        assert_eq!(q.pop(), Some(3));
-        assert_eq!(q.pop(), Some(4));
-        assert_eq!(q.pop(), None);
-    }
-}
-
-#[test]
-fn stress() {
-    unsafe {
-        stress_bound(0);
-        stress_bound(1);
-    }
-
-    unsafe fn stress_bound(bound: usize) {
-        let count = if cfg!(miri) { 1000 } else { 100000 };
-        let q = Arc::new(Queue::with_additions(bound, (), ()));
-
-        let (tx, rx) = channel();
-        let q2 = q.clone();
-        let _t = thread::spawn(move || {
-            for _ in 0..count {
-                loop {
-                    match q2.pop() {
-                        Some(1) => break,
-                        Some(_) => panic!(),
-                        None => {}
-                    }
-                }
-            }
-            tx.send(()).unwrap();
-        });
-        for _ in 0..count {
-            q.push(1);
-        }
-        rx.recv().unwrap();
-    }
-}
diff --git a/library/std/src/sync/mpsc/stream.rs b/library/std/src/sync/mpsc/stream.rs
deleted file mode 100644
index 4592e914160..00000000000
--- a/library/std/src/sync/mpsc/stream.rs
+++ /dev/null
@@ -1,457 +0,0 @@
-/// Stream channels
-///
-/// This is the flavor of channels which are optimized for one sender and one
-/// receiver. The sender will be upgraded to a shared channel if the channel is
-/// cloned.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module.
-pub use self::Failure::*;
-use self::Message::*;
-pub use self::UpgradeResult::*;
-
-use core::cmp;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::thread;
-use crate::time::Instant;
-
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::spsc_queue as spsc;
-use crate::sync::mpsc::Receiver;
-
-const DISCONNECTED: isize = isize::MIN;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
-
-pub struct Packet<T> {
-    // internal queue for all messages
-    queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
-}
-
-struct ProducerAddition {
-    cnt: AtomicIsize,       // How many items are on this channel
-    to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
-
-    port_dropped: AtomicBool, // flag if the channel has been destroyed.
-}
-
-struct ConsumerAddition {
-    steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-}
-
-pub enum Failure<T> {
-    Empty,
-    Disconnected,
-    Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
-    UpSuccess,
-    UpDisconnected,
-    UpWoke(SignalToken),
-}
-
-// Any message could contain an "upgrade request" to a new shared port, so the
-// internal queue it's a queue of T, but rather Message<T>
-enum Message<T> {
-    Data(T),
-    GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
-    pub fn new() -> Packet<T> {
-        Packet {
-            queue: unsafe {
-                spsc::Queue::with_additions(
-                    128,
-                    ProducerAddition {
-                        cnt: AtomicIsize::new(0),
-                        to_wake: AtomicPtr::new(EMPTY),
-
-                        port_dropped: AtomicBool::new(false),
-                    },
-                    ConsumerAddition { steals: UnsafeCell::new(0) },
-                )
-            },
-        }
-    }
-
-    pub fn send(&self, t: T) -> Result<(), T> {
-        // If the other port has deterministically gone away, then definitely
-        // must return the data back up the stack. Otherwise, the data is
-        // considered as being sent.
-        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
-            return Err(t);
-        }
-
-        match self.do_send(Data(t)) {
-            UpSuccess | UpDisconnected => {}
-            UpWoke(token) => {
-                token.signal();
-            }
-        }
-        Ok(())
-    }
-
-    pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
-        // If the port has gone away, then there's no need to proceed any
-        // further.
-        if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
-            return UpDisconnected;
-        }
-
-        self.do_send(GoUp(up))
-    }
-
-    fn do_send(&self, t: Message<T>) -> UpgradeResult {
-        self.queue.push(t);
-        match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
-            // As described in the mod's doc comment, -1 == wakeup
-            -1 => UpWoke(self.take_to_wake()),
-            // As described before, SPSC queues must be >= -2
-            -2 => UpSuccess,
-
-            // Be sure to preserve the disconnected state, and the return value
-            // in this case is going to be whether our data was received or not.
-            // This manifests itself on whether we have an empty queue or not.
-            //
-            // Primarily, are required to drain the queue here because the port
-            // will never remove this data. We can only have at most one item to
-            // drain (the port drains the rest).
-            DISCONNECTED => {
-                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
-                let first = self.queue.pop();
-                let second = self.queue.pop();
-                assert!(second.is_none());
-
-                match first {
-                    Some(..) => UpSuccess,  // we failed to send the data
-                    None => UpDisconnected, // we successfully sent data
-                }
-            }
-
-            // Otherwise we just sent some data on a non-waiting queue, so just
-            // make sure the world is sane and carry on!
-            n => {
-                assert!(n >= 0);
-                UpSuccess
-            }
-        }
-    }
-
-    // Consumes ownership of the 'to_wake' field.
-    fn take_to_wake(&self) -> SignalToken {
-        let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
-        self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
-        assert!(ptr != EMPTY);
-        unsafe { SignalToken::from_raw(ptr) }
-    }
-
-    // Decrements the count on the channel for a sleeper, returning the sleeper
-    // back if it shouldn't sleep. Note that this is the location where we take
-    // steals into account.
-    fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
-        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
-        let ptr = unsafe { token.to_raw() };
-        self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
-
-        let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
-
-        match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
-            DISCONNECTED => {
-                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
-            }
-            // If we factor in our steals and notice that the channel has no
-            // data, we successfully sleep
-            n => {
-                assert!(n >= 0);
-                if n - steals <= 0 {
-                    return Ok(());
-                }
-            }
-        }
-
-        self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
-        Err(unsafe { SignalToken::from_raw(ptr) })
-    }
-
-    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
-        // Optimistic preflight check (scheduling is expensive).
-        match self.try_recv() {
-            Err(Empty) => {}
-            data => return data,
-        }
-
-        // Welp, our channel has no data. Deschedule the current thread and
-        // initiate the blocking protocol.
-        let (wait_token, signal_token) = blocking::tokens();
-        if self.decrement(signal_token).is_ok() {
-            if let Some(deadline) = deadline {
-                let timed_out = !wait_token.wait_max_until(deadline);
-                if timed_out {
-                    self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
-                }
-            } else {
-                wait_token.wait();
-            }
-        }
-
-        match self.try_recv() {
-            // Messages which actually popped from the queue shouldn't count as
-            // a steal, so offset the decrement here (we already have our
-            // "steal" factored into the channel count above).
-            data @ (Ok(..) | Err(Upgraded(..))) => unsafe {
-                *self.queue.consumer_addition().steals.get() -= 1;
-                data
-            },
-
-            data => data,
-        }
-    }
-
-    pub fn try_recv(&self) -> Result<T, Failure<T>> {
-        match self.queue.pop() {
-            // If we stole some data, record to that effect (this will be
-            // factored into cnt later on).
-            //
-            // Note that we don't allow steals to grow without bound in order to
-            // prevent eventual overflow of either steals or cnt as an overflow
-            // would have catastrophic results. Sometimes, steals > cnt, but
-            // other times cnt > steals, so we don't know the relation between
-            // steals and cnt. This code path is executed only rarely, so we do
-            // a pretty slow operation, of swapping 0 into cnt, taking steals
-            // down as much as possible (without going negative), and then
-            // adding back in whatever we couldn't factor into steals.
-            Some(data) => unsafe {
-                if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
-                    match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
-                        DISCONNECTED => {
-                            self.queue
-                                .producer_addition()
-                                .cnt
-                                .store(DISCONNECTED, Ordering::SeqCst);
-                        }
-                        n => {
-                            let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
-                            *self.queue.consumer_addition().steals.get() -= m;
-                            self.bump(n - m);
-                        }
-                    }
-                    assert!(*self.queue.consumer_addition().steals.get() >= 0);
-                }
-                *self.queue.consumer_addition().steals.get() += 1;
-                match data {
-                    Data(t) => Ok(t),
-                    GoUp(up) => Err(Upgraded(up)),
-                }
-            },
-
-            None => {
-                match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
-                    n if n != DISCONNECTED => Err(Empty),
-
-                    // This is a little bit of a tricky case. We failed to pop
-                    // data above, and then we have viewed that the channel is
-                    // disconnected. In this window more data could have been
-                    // sent on the channel. It doesn't really make sense to
-                    // return that the channel is disconnected when there's
-                    // actually data on it, so be extra sure there's no data by
-                    // popping one more time.
-                    //
-                    // We can ignore steals because the other end is
-                    // disconnected and we'll never need to really factor in our
-                    // steals again.
-                    _ => match self.queue.pop() {
-                        Some(Data(t)) => Ok(t),
-                        Some(GoUp(up)) => Err(Upgraded(up)),
-                        None => Err(Disconnected),
-                    },
-                }
-            }
-        }
-    }
-
-    pub fn drop_chan(&self) {
-        // Dropping a channel is pretty simple, we just flag it as disconnected
-        // and then wakeup a blocker if there is one.
-        match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-            -1 => {
-                self.take_to_wake().signal();
-            }
-            DISCONNECTED => {}
-            n => {
-                assert!(n >= 0);
-            }
-        }
-    }
-
-    pub fn drop_port(&self) {
-        // Dropping a port seems like a fairly trivial thing. In theory all we
-        // need to do is flag that we're disconnected and then everything else
-        // can take over (we don't have anyone to wake up).
-        //
-        // The catch for Ports is that we want to drop the entire contents of
-        // the queue. There are multiple reasons for having this property, the
-        // largest of which is that if another chan is waiting in this channel
-        // (but not received yet), then waiting on that port will cause a
-        // deadlock.
-        //
-        // So if we accept that we must now destroy the entire contents of the
-        // queue, this code may make a bit more sense. The tricky part is that
-        // we can't let any in-flight sends go un-dropped, we have to make sure
-        // *everything* is dropped and nothing new will come onto the channel.
-
-        // The first thing we do is set a flag saying that we're done for. All
-        // sends are gated on this flag, so we're immediately guaranteed that
-        // there are a bounded number of active sends that we'll have to deal
-        // with.
-        self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
-
-        // Now that we're guaranteed to deal with a bounded number of senders,
-        // we need to drain the queue. This draining process happens atomically
-        // with respect to the "count" of the channel. If the count is nonzero
-        // (with steals taken into account), then there must be data on the
-        // channel. In this case we drain everything and then try again. We will
-        // continue to fail while active senders send data while we're dropping
-        // data, but eventually we're guaranteed to break out of this loop
-        // (because there is a bounded number of senders).
-        let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
-        while {
-            match self.queue.producer_addition().cnt.compare_exchange(
-                steals,
-                DISCONNECTED,
-                Ordering::SeqCst,
-                Ordering::SeqCst,
-            ) {
-                Ok(_) => false,
-                Err(old) => old != DISCONNECTED,
-            }
-        } {
-            while self.queue.pop().is_some() {
-                steals += 1;
-            }
-        }
-
-        // At this point in time, we have gated all future senders from sending,
-        // and we have flagged the channel as being disconnected. The senders
-        // still have some responsibility, however, because some sends might not
-        // complete until after we flag the disconnection. There are more
-        // details in the sending methods that see DISCONNECTED
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // select implementation
-    ////////////////////////////////////////////////////////////////////////////
-
-    // increment the count on the channel (used for selection)
-    fn bump(&self, amt: isize) -> isize {
-        match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
-            DISCONNECTED => {
-                self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
-                DISCONNECTED
-            }
-            n => n,
-        }
-    }
-
-    // Removes a previous thread from being blocked in this port
-    pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
-        // If we're aborting selection after upgrading from a oneshot, then
-        // we're guarantee that no one is waiting. The only way that we could
-        // have seen the upgrade is if data was actually sent on the channel
-        // half again. For us, this means that there is guaranteed to be data on
-        // this channel. Furthermore, we're guaranteed that there was no
-        // start_selection previously, so there's no need to modify `self.cnt`
-        // at all.
-        //
-        // Hence, because of these invariants, we immediately return `Ok(true)`.
-        // Note that the data might not actually be sent on the channel just yet.
-        // The other end could have flagged the upgrade but not sent data to
-        // this end. This is fine because we know it's a small bounded windows
-        // of time until the data is actually sent.
-        if was_upgrade {
-            assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
-            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
-            return Ok(true);
-        }
-
-        // We want to make sure that the count on the channel goes non-negative,
-        // and in the stream case we can have at most one steal, so just assume
-        // that we had one steal.
-        let steals = 1;
-        let prev = self.bump(steals + 1);
-
-        // If we were previously disconnected, then we know for sure that there
-        // is no thread in to_wake, so just keep going
-        let has_data = if prev == DISCONNECTED {
-            assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
-            true // there is data, that data is that we're disconnected
-        } else {
-            let cur = prev + steals + 1;
-            assert!(cur >= 0);
-
-            // If the previous count was negative, then we just made things go
-            // positive, hence we passed the -1 boundary and we're responsible
-            // for removing the to_wake() field and trashing it.
-            //
-            // If the previous count was positive then we're in a tougher
-            // situation. A possible race is that a sender just incremented
-            // through -1 (meaning it's going to try to wake a thread up), but it
-            // hasn't yet read the to_wake. In order to prevent a future recv()
-            // from waking up too early (this sender picking up the plastered
-            // over to_wake), we spin loop here waiting for to_wake to be 0.
-            // Note that this entire select() implementation needs an overhaul,
-            // and this is *not* the worst part of it, so this is not done as a
-            // final solution but rather out of necessity for now to get
-            // something working.
-            if prev < 0 {
-                drop(self.take_to_wake());
-            } else {
-                while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
-                    thread::yield_now();
-                }
-            }
-            unsafe {
-                assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
-                *self.queue.consumer_addition().steals.get() = steals;
-            }
-
-            // if we were previously positive, then there's surely data to
-            // receive
-            prev >= 0
-        };
-
-        // Now that we've determined that this queue "has data", we peek at the
-        // queue to see if the data is an upgrade or not. If it's an upgrade,
-        // then we need to destroy this port and abort selection on the
-        // upgraded port.
-        if has_data {
-            match self.queue.peek() {
-                Some(&mut GoUp(..)) => match self.queue.pop() {
-                    Some(GoUp(port)) => Err(port),
-                    _ => unreachable!(),
-                },
-                _ => Ok(true),
-            }
-        } else {
-            Ok(false)
-        }
-    }
-}
-
-impl<T> Drop for Packet<T> {
-    fn drop(&mut self) {
-        // Note that this load is not only an assert for correctness about
-        // disconnection, but also a proper fence before the read of
-        // `to_wake`, so this assert cannot be removed with also removing
-        // the `to_wake` assert.
-        assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
-        assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
-    }
-}
diff --git a/library/std/src/sync/mpsc/sync.rs b/library/std/src/sync/mpsc/sync.rs
deleted file mode 100644
index 733761671a0..00000000000
--- a/library/std/src/sync/mpsc/sync.rs
+++ /dev/null
@@ -1,495 +0,0 @@
-use self::Blocker::*;
-/// Synchronous channels/ports
-///
-/// This channel implementation differs significantly from the asynchronous
-/// implementations found next to it (oneshot/stream/share). This is an
-/// implementation of a synchronous, bounded buffer channel.
-///
-/// Each channel is created with some amount of backing buffer, and sends will
-/// *block* until buffer space becomes available. A buffer size of 0 is valid,
-/// which means that every successful send is paired with a successful recv.
-///
-/// This flavor of channels defines a new `send_opt` method for channels which
-/// is the method by which a message is sent but the thread does not panic if it
-/// cannot be delivered.
-///
-/// Another major difference is that send() will *always* return back the data
-/// if it couldn't be sent. This is because it is deterministically known when
-/// the data is received and when it is not received.
-///
-/// Implementation-wise, it can all be summed up with "use a mutex plus some
-/// logic". The mutex used here is an OS native mutex, meaning that no user code
-/// is run inside of the mutex (to prevent context switching). This
-/// implementation shares almost all code for the buffered and unbuffered cases
-/// of a synchronous channel. There are a few branches for the unbuffered case,
-/// but they're mostly just relevant to blocking senders.
-pub use self::Failure::*;
-
-use core::intrinsics::abort;
-use core::mem;
-use core::ptr;
-
-use crate::sync::atomic::{AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
-use crate::sync::{Mutex, MutexGuard};
-use crate::time::Instant;
-
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-
-pub struct Packet<T> {
-    /// Only field outside of the mutex. Just done for kicks, but mainly because
-    /// the other shared channel already had the code implemented
-    channels: AtomicUsize,
-
-    lock: Mutex<State<T>>,
-}
-
-unsafe impl<T: Send> Send for Packet<T> {}
-
-unsafe impl<T: Send> Sync for Packet<T> {}
-
-struct State<T> {
-    disconnected: bool, // Is the channel disconnected yet?
-    queue: Queue,       // queue of senders waiting to send data
-    blocker: Blocker,   // currently blocked thread on this channel
-    buf: Buffer<T>,     // storage for buffered messages
-    cap: usize,         // capacity of this channel
-
-    /// A curious flag used to indicate whether a sender failed or succeeded in
-    /// blocking. This is used to transmit information back to the thread that it
-    /// must dequeue its message from the buffer because it was not received.
-    /// This is only relevant in the 0-buffer case. This obviously cannot be
-    /// safely constructed, but it's guaranteed to always have a valid pointer
-    /// value.
-    canceled: Option<&'static mut bool>,
-}
-
-unsafe impl<T: Send> Send for State<T> {}
-
-/// Possible flavors of threads who can be blocked on this channel.
-enum Blocker {
-    BlockedSender(SignalToken),
-    BlockedReceiver(SignalToken),
-    NoneBlocked,
-}
-
-/// Simple queue for threading threads together. Nodes are stack-allocated, so
-/// this structure is not safe at all
-struct Queue {
-    head: *mut Node,
-    tail: *mut Node,
-}
-
-struct Node {
-    token: Option<SignalToken>,
-    next: *mut Node,
-}
-
-unsafe impl Send for Node {}
-
-/// A simple ring-buffer
-struct Buffer<T> {
-    buf: Vec<Option<T>>,
-    start: usize,
-    size: usize,
-}
-
-#[derive(Debug)]
-pub enum Failure {
-    Empty,
-    Disconnected,
-}
-
-/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
-/// in the meantime. This re-locks the mutex upon returning.
-fn wait<'a, 'b, T>(
-    lock: &'a Mutex<State<T>>,
-    mut guard: MutexGuard<'b, State<T>>,
-    f: fn(SignalToken) -> Blocker,
-) -> MutexGuard<'a, State<T>> {
-    let (wait_token, signal_token) = blocking::tokens();
-    match mem::replace(&mut guard.blocker, f(signal_token)) {
-        NoneBlocked => {}
-        _ => unreachable!(),
-    }
-    drop(guard); // unlock
-    wait_token.wait(); // block
-    lock.lock().unwrap() // relock
-}
-
-/// Same as wait, but waiting at most until `deadline`.
-fn wait_timeout_receiver<'a, 'b, T>(
-    lock: &'a Mutex<State<T>>,
-    deadline: Instant,
-    mut guard: MutexGuard<'b, State<T>>,
-    success: &mut bool,
-) -> MutexGuard<'a, State<T>> {
-    let (wait_token, signal_token) = blocking::tokens();
-    match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
-        NoneBlocked => {}
-        _ => unreachable!(),
-    }
-    drop(guard); // unlock
-    *success = wait_token.wait_max_until(deadline); // block
-    let mut new_guard = lock.lock().unwrap(); // relock
-    if !*success {
-        abort_selection(&mut new_guard);
-    }
-    new_guard
-}
-
-fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
-    match mem::replace(&mut guard.blocker, NoneBlocked) {
-        NoneBlocked => true,
-        BlockedSender(token) => {
-            guard.blocker = BlockedSender(token);
-            true
-        }
-        BlockedReceiver(token) => {
-            drop(token);
-            false
-        }
-    }
-}
-
-/// Wakes up a thread, dropping the lock at the correct time
-fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
-    // We need to be careful to wake up the waiting thread *outside* of the mutex
-    // in case it incurs a context switch.
-    drop(guard);
-    token.signal();
-}
-
-impl<T> Packet<T> {
-    pub fn new(capacity: usize) -> Packet<T> {
-        Packet {
-            channels: AtomicUsize::new(1),
-            lock: Mutex::new(State {
-                disconnected: false,
-                blocker: NoneBlocked,
-                cap: capacity,
-                canceled: None,
-                queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
-                buf: Buffer {
-                    buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
-                    start: 0,
-                    size: 0,
-                },
-            }),
-        }
-    }
-
-    // wait until a send slot is available, returning locked access to
-    // the channel state.
-    fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
-        let mut node = Node { token: None, next: ptr::null_mut() };
-        loop {
-            let mut guard = self.lock.lock().unwrap();
-            // are we ready to go?
-            if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
-                return guard;
-            }
-            // no room; actually block
-            let wait_token = guard.queue.enqueue(&mut node);
-            drop(guard);
-            wait_token.wait();
-        }
-    }
-
-    pub fn send(&self, t: T) -> Result<(), T> {
-        let mut guard = self.acquire_send_slot();
-        if guard.disconnected {
-            return Err(t);
-        }
-        guard.buf.enqueue(t);
-
-        match mem::replace(&mut guard.blocker, NoneBlocked) {
-            // if our capacity is 0, then we need to wait for a receiver to be
-            // available to take our data. After waiting, we check again to make
-            // sure the port didn't go away in the meantime. If it did, we need
-            // to hand back our data.
-            NoneBlocked if guard.cap == 0 => {
-                let mut canceled = false;
-                assert!(guard.canceled.is_none());
-                guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
-                let mut guard = wait(&self.lock, guard, BlockedSender);
-                if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
-            }
-
-            // success, we buffered some data
-            NoneBlocked => Ok(()),
-
-            // success, someone's about to receive our buffered data.
-            BlockedReceiver(token) => {
-                wakeup(token, guard);
-                Ok(())
-            }
-
-            BlockedSender(..) => panic!("lolwut"),
-        }
-    }
-
-    pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
-        let mut guard = self.lock.lock().unwrap();
-        if guard.disconnected {
-            Err(super::TrySendError::Disconnected(t))
-        } else if guard.buf.size() == guard.buf.capacity() {
-            Err(super::TrySendError::Full(t))
-        } else if guard.cap == 0 {
-            // With capacity 0, even though we have buffer space we can't
-            // transfer the data unless there's a receiver waiting.
-            match mem::replace(&mut guard.blocker, NoneBlocked) {
-                NoneBlocked => Err(super::TrySendError::Full(t)),
-                BlockedSender(..) => unreachable!(),
-                BlockedReceiver(token) => {
-                    guard.buf.enqueue(t);
-                    wakeup(token, guard);
-                    Ok(())
-                }
-            }
-        } else {
-            // If the buffer has some space and the capacity isn't 0, then we
-            // just enqueue the data for later retrieval, ensuring to wake up
-            // any blocked receiver if there is one.
-            assert!(guard.buf.size() < guard.buf.capacity());
-            guard.buf.enqueue(t);
-            match mem::replace(&mut guard.blocker, NoneBlocked) {
-                BlockedReceiver(token) => wakeup(token, guard),
-                NoneBlocked => {}
-                BlockedSender(..) => unreachable!(),
-            }
-            Ok(())
-        }
-    }
-
-    // Receives a message from this channel
-    //
-    // When reading this, remember that there can only ever be one receiver at
-    // time.
-    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
-        let mut guard = self.lock.lock().unwrap();
-
-        let mut woke_up_after_waiting = false;
-        // Wait for the buffer to have something in it. No need for a
-        // while loop because we're the only receiver.
-        if !guard.disconnected && guard.buf.size() == 0 {
-            if let Some(deadline) = deadline {
-                guard =
-                    wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
-            } else {
-                guard = wait(&self.lock, guard, BlockedReceiver);
-                woke_up_after_waiting = true;
-            }
-        }
-
-        // N.B., channel could be disconnected while waiting, so the order of
-        // these conditionals is important.
-        if guard.disconnected && guard.buf.size() == 0 {
-            return Err(Disconnected);
-        }
-
-        // Pick up the data, wake up our neighbors, and carry on
-        assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
-
-        if guard.buf.size() == 0 {
-            return Err(Empty);
-        }
-
-        let ret = guard.buf.dequeue();
-        self.wakeup_senders(woke_up_after_waiting, guard);
-        Ok(ret)
-    }
-
-    pub fn try_recv(&self) -> Result<T, Failure> {
-        let mut guard = self.lock.lock().unwrap();
-
-        // Easy cases first
-        if guard.disconnected && guard.buf.size() == 0 {
-            return Err(Disconnected);
-        }
-        if guard.buf.size() == 0 {
-            return Err(Empty);
-        }
-
-        // Be sure to wake up neighbors
-        let ret = Ok(guard.buf.dequeue());
-        self.wakeup_senders(false, guard);
-        ret
-    }
-
-    // Wake up pending senders after some data has been received
-    //
-    // * `waited` - flag if the receiver blocked to receive some data, or if it
-    //              just picked up some data on the way out
-    // * `guard` - the lock guard that is held over this channel's lock
-    fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
-        let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
-
-        // If this is a no-buffer channel (cap == 0), then if we didn't wait we
-        // need to ACK the sender. If we waited, then the sender waking us up
-        // was already the ACK.
-        let pending_sender2 = if guard.cap == 0 && !waited {
-            match mem::replace(&mut guard.blocker, NoneBlocked) {
-                NoneBlocked => None,
-                BlockedReceiver(..) => unreachable!(),
-                BlockedSender(token) => {
-                    guard.canceled.take();
-                    Some(token)
-                }
-            }
-        } else {
-            None
-        };
-        mem::drop(guard);
-
-        // only outside of the lock do we wake up the pending threads
-        if let Some(token) = pending_sender1 {
-            token.signal();
-        }
-        if let Some(token) = pending_sender2 {
-            token.signal();
-        }
-    }
-
-    // Prepares this shared packet for a channel clone, essentially just bumping
-    // a refcount.
-    pub fn clone_chan(&self) {
-        let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
-        // See comments on Arc::clone() on why we do this (for `mem::forget`).
-        if old_count > MAX_REFCOUNT {
-            abort();
-        }
-    }
-
-    pub fn drop_chan(&self) {
-        // Only flag the channel as disconnected if we're the last channel
-        match self.channels.fetch_sub(1, Ordering::SeqCst) {
-            1 => {}
-            _ => return,
-        }
-
-        // Not much to do other than wake up a receiver if one's there
-        let mut guard = self.lock.lock().unwrap();
-        if guard.disconnected {
-            return;
-        }
-        guard.disconnected = true;
-        match mem::replace(&mut guard.blocker, NoneBlocked) {
-            NoneBlocked => {}
-            BlockedSender(..) => unreachable!(),
-            BlockedReceiver(token) => wakeup(token, guard),
-        }
-    }
-
-    pub fn drop_port(&self) {
-        let mut guard = self.lock.lock().unwrap();
-
-        if guard.disconnected {
-            return;
-        }
-        guard.disconnected = true;
-
-        // If the capacity is 0, then the sender may want its data back after
-        // we're disconnected. Otherwise it's now our responsibility to destroy
-        // the buffered data. As with many other portions of this code, this
-        // needs to be careful to destroy the data *outside* of the lock to
-        // prevent deadlock.
-        let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
-        let mut queue =
-            mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
-
-        let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
-            NoneBlocked => None,
-            BlockedSender(token) => {
-                *guard.canceled.take().unwrap() = true;
-                Some(token)
-            }
-            BlockedReceiver(..) => unreachable!(),
-        };
-        mem::drop(guard);
-
-        while let Some(token) = queue.dequeue() {
-            token.signal();
-        }
-        if let Some(token) = waiter {
-            token.signal();
-        }
-    }
-}
-
-impl<T> Drop for Packet<T> {
-    fn drop(&mut self) {
-        assert_eq!(self.channels.load(Ordering::SeqCst), 0);
-        let mut guard = self.lock.lock().unwrap();
-        assert!(guard.queue.dequeue().is_none());
-        assert!(guard.canceled.is_none());
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Buffer, a simple ring buffer backed by Vec<T>
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> Buffer<T> {
-    fn enqueue(&mut self, t: T) {
-        let pos = (self.start + self.size) % self.buf.len();
-        self.size += 1;
-        let prev = mem::replace(&mut self.buf[pos], Some(t));
-        assert!(prev.is_none());
-    }
-
-    fn dequeue(&mut self) -> T {
-        let start = self.start;
-        self.size -= 1;
-        self.start = (self.start + 1) % self.buf.len();
-        let result = &mut self.buf[start];
-        result.take().unwrap()
-    }
-
-    fn size(&self) -> usize {
-        self.size
-    }
-    fn capacity(&self) -> usize {
-        self.buf.len()
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
-////////////////////////////////////////////////////////////////////////////////
-
-impl Queue {
-    fn enqueue(&mut self, node: &mut Node) -> WaitToken {
-        let (wait_token, signal_token) = blocking::tokens();
-        node.token = Some(signal_token);
-        node.next = ptr::null_mut();
-
-        if self.tail.is_null() {
-            self.head = node as *mut Node;
-            self.tail = node as *mut Node;
-        } else {
-            unsafe {
-                (*self.tail).next = node as *mut Node;
-                self.tail = node as *mut Node;
-            }
-        }
-
-        wait_token
-    }
-
-    fn dequeue(&mut self) -> Option<SignalToken> {
-        if self.head.is_null() {
-            return None;
-        }
-        let node = self.head;
-        self.head = unsafe { (*node).next };
-        if self.head.is_null() {
-            self.tail = ptr::null_mut();
-        }
-        unsafe {
-            (*node).next = ptr::null_mut();
-            Some((*node).token.take().unwrap())
-        }
-    }
-}
diff --git a/library/std/src/sync/mpsc/tests.rs b/library/std/src/sync/mpsc/tests.rs
index f6d0796f604..82c52eb4fef 100644
--- a/library/std/src/sync/mpsc/tests.rs
+++ b/library/std/src/sync/mpsc/tests.rs
@@ -706,3 +706,17 @@ fn issue_32114() {
     let _ = tx.send(123);
     assert_eq!(tx.send(123), Err(SendError(123)));
 }
+
+#[test]
+fn issue_39364() {
+    let (tx, rx) = channel::<()>();
+    let t = thread::spawn(move || {
+        thread::sleep(Duration::from_millis(300));
+        let _ = tx.clone();
+        crate::mem::forget(tx);
+    });
+
+    let _ = rx.recv_timeout(Duration::from_millis(500));
+    t.join().unwrap();
+    let _ = rx.recv_timeout(Duration::from_millis(500));
+}