about summary refs log tree commit diff
path: root/src/libstd/sync/mpsc/mpsc_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync/mpsc/mpsc_queue.rs')
-rw-r--r--src/libstd/sync/mpsc/mpsc_queue.rs165
1 files changed, 0 insertions, 165 deletions
diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs
deleted file mode 100644
index 6e7a7be4430..00000000000
--- a/src/libstd/sync/mpsc/mpsc_queue.rs
+++ /dev/null
@@ -1,165 +0,0 @@
-//! A mostly lock-free multi-producer, single consumer queue.
-//!
-//! This module contains an implementation of a concurrent MPSC queue. This
-//! queue can be used to share data between threads, and is also used as the
-//! building block of channels in rust.
-//!
-//! Note that the current implementation of this queue has a caveat of the `pop`
-//! method, and see the method for more information about it. Due to this
-//! caveat, this queue may not be appropriate for all use-cases.
-
-// http://www.1024cores.net/home/lock-free-algorithms
-//                         /queues/non-intrusive-mpsc-node-based-queue
-
-pub use self::PopResult::*;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-
-/// A result of the `pop` function.
-pub enum PopResult<T> {
-    /// Some data has been popped
-    Data(T),
-    /// The queue is empty
-    Empty,
-    /// The queue is in an inconsistent state. Popping data should succeed, but
-    /// some pushers have yet to make enough progress in order allow a pop to
-    /// succeed. It is recommended that a pop() occur "in the near future" in
-    /// order to see if the sender has made progress or not
-    Inconsistent,
-}
-
-struct Node<T> {
-    next: AtomicPtr<Node<T>>,
-    value: Option<T>,
-}
-
-/// The multi-producer single-consumer structure. This is not cloneable, but it
-/// may be safely shared so long as it is guaranteed that there is only one
-/// popper at a time (many pushers are allowed).
-pub struct Queue<T> {
-    head: AtomicPtr<Node<T>>,
-    tail: UnsafeCell<*mut Node<T>>,
-}
-
-unsafe impl<T: Send> Send for Queue<T> {}
-unsafe impl<T: Send> Sync for Queue<T> {}
-
-impl<T> Node<T> {
-    unsafe fn new(v: Option<T>) -> *mut Node<T> {
-        Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
-    }
-}
-
-impl<T> Queue<T> {
-    /// Creates a new queue that is safe to share among multiple producers and
-    /// one consumer.
-    pub fn new() -> Queue<T> {
-        let stub = unsafe { Node::new(None) };
-        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
-    }
-
-    /// Pushes a new value onto this queue.
-    pub fn push(&self, t: T) {
-        unsafe {
-            let n = Node::new(Some(t));
-            let prev = self.head.swap(n, Ordering::AcqRel);
-            (*prev).next.store(n, Ordering::Release);
-        }
-    }
-
-    /// Pops some data from this queue.
-    ///
-    /// Note that the current implementation means that this function cannot
-    /// return `Option<T>`. It is possible for this queue to be in an
-    /// inconsistent state where many pushes have succeeded and completely
-    /// finished, but pops cannot return `Some(t)`. This inconsistent state
-    /// happens when a pusher is pre-empted at an inopportune moment.
-    ///
-    /// This inconsistent state means that this queue does indeed have data, but
-    /// it does not currently have access to it at this time.
-    pub fn pop(&self) -> PopResult<T> {
-        unsafe {
-            let tail = *self.tail.get();
-            let next = (*tail).next.load(Ordering::Acquire);
-
-            if !next.is_null() {
-                *self.tail.get() = next;
-                assert!((*tail).value.is_none());
-                assert!((*next).value.is_some());
-                let ret = (*next).value.take().unwrap();
-                let _: Box<Node<T>> = Box::from_raw(tail);
-                return Data(ret);
-            }
-
-            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
-        }
-    }
-}
-
-impl<T> Drop for Queue<T> {
-    fn drop(&mut self) {
-        unsafe {
-            let mut cur = *self.tail.get();
-            while !cur.is_null() {
-                let next = (*cur).next.load(Ordering::Relaxed);
-                let _: Box<Node<T>> = Box::from_raw(cur);
-                cur = next;
-            }
-        }
-    }
-}
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests {
-    use super::{Data, Empty, Inconsistent, Queue};
-    use crate::sync::mpsc::channel;
-    use crate::sync::Arc;
-    use crate::thread;
-
-    #[test]
-    fn test_full() {
-        let q: Queue<Box<_>> = Queue::new();
-        q.push(box 1);
-        q.push(box 2);
-    }
-
-    #[test]
-    fn test() {
-        let nthreads = 8;
-        let nmsgs = 1000;
-        let q = Queue::new();
-        match q.pop() {
-            Empty => {}
-            Inconsistent | Data(..) => panic!(),
-        }
-        let (tx, rx) = channel();
-        let q = Arc::new(q);
-
-        for _ in 0..nthreads {
-            let tx = tx.clone();
-            let q = q.clone();
-            thread::spawn(move || {
-                for i in 0..nmsgs {
-                    q.push(i);
-                }
-                tx.send(()).unwrap();
-            });
-        }
-
-        let mut i = 0;
-        while i < nthreads * nmsgs {
-            match q.pop() {
-                Empty | Inconsistent => {}
-                Data(_) => i += 1,
-            }
-        }
-        drop(tx);
-        for _ in 0..nthreads {
-            rx.recv().unwrap();
-        }
-    }
-}