about summary refs log tree commit diff
path: root/src/libstd/rt/mpsc_queue.rs
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-12-05 17:56:17 -0800
committerAlex Crichton <alex@alexcrichton.com>2013-12-16 17:47:11 -0800
commitbfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch)
treeb10aeff181eff3a8654df495d2ad8826490f6533 /src/libstd/rt/mpsc_queue.rs
parent000cda611f8224ac780fa37432f869f425cd2bb7 (diff)
downloadrust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz
rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
    * Based on a single-producer single-consumer lock-free queue that doesn't
      always have to allocate on every send.
    * Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
    * try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
    * try_recv == never block
    * recv_opt == block, don't fail
    * iter() == Iterator<T> for Port<T>
    * removed peek
    * Type::new
* Removed rt::comm
Diffstat (limited to 'src/libstd/rt/mpsc_queue.rs')
-rw-r--r--src/libstd/rt/mpsc_queue.rs230
1 files changed, 120 insertions, 110 deletions
diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs
index 4f39a1df4fa..d575028af70 100644
--- a/src/libstd/rt/mpsc_queue.rs
+++ b/src/libstd/rt/mpsc_queue.rs
@@ -1,5 +1,4 @@
-/* Multi-producer/single-consumer queue
- * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions are met:
  *
@@ -27,163 +26,177 @@
  */
 
 //! A mostly lock-free multi-producer, single consumer queue.
-// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
 
-use unstable::sync::UnsafeArc;
-use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire};
-use ptr::{mut_null, to_mut_unsafe_ptr};
+// http://www.1024cores.net/home/lock-free-algorithms
+//                         /queues/non-intrusive-mpsc-node-based-queue
+
 use cast;
-use option::*;
 use clone::Clone;
 use kinds::Send;
+use ops::Drop;
+use option::{Option, None, Some};
+use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
+use unstable::sync::UnsafeArc;
+
+pub enum PopResult<T> {
+    /// Some data has been popped
+    Data(T),
+    /// The queue is empty
+    Empty,
+    /// The queue is in an inconsistent state. Popping data should succeed, but
+    /// some pushers have yet to make enough progress in order allow a pop to
+    /// succeed. It is recommended that a pop() occur "in the near future" in
+    /// order to see if the sender has made progress or not
+    Inconsistent,
+}
 
 struct Node<T> {
     next: AtomicPtr<Node<T>>,
     value: Option<T>,
 }
 
-impl<T> Node<T> {
-    fn empty() -> Node<T> {
-        Node{next: AtomicPtr::new(mut_null()), value: None}
-    }
-
-    fn with_value(value: T) -> Node<T> {
-        Node{next: AtomicPtr::new(mut_null()), value: Some(value)}
-    }
-}
-
-struct State<T> {
-    pad0: [u8, ..64],
+struct State<T, P> {
     head: AtomicPtr<Node<T>>,
-    pad1: [u8, ..64],
-    stub: Node<T>,
-    pad2: [u8, ..64],
     tail: *mut Node<T>,
-    pad3: [u8, ..64],
+    packet: P,
 }
 
-struct Queue<T> {
-    priv state: UnsafeArc<State<T>>,
+pub struct Consumer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
 }
 
-impl<T: Send> Clone for Queue<T> {
-    fn clone(&self) -> Queue<T> {
-        Queue {
-            state: self.state.clone()
-        }
-    }
+pub struct Producer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
 }
 
-impl<T: Send> State<T> {
-    pub fn new() -> State<T> {
-        State{
-            pad0: [0, ..64],
-            head: AtomicPtr::new(mut_null()),
-            pad1: [0, ..64],
-            stub: Node::<T>::empty(),
-            pad2: [0, ..64],
-            tail: mut_null(),
-            pad3: [0, ..64],
-        }
+impl<T: Send, P: Send> Clone for Producer<T, P> {
+    fn clone(&self) -> Producer<T, P> {
+        Producer { state: self.state.clone() }
     }
+}
 
-    fn init(&mut self) {
-        let stub = self.get_stub_unsafe();
-        self.head.store(stub, Relaxed);
-        self.tail = stub;
+pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) {
+    unsafe {
+        let (a, b) = UnsafeArc::new2(State::new(p));
+        (Consumer { state: a }, Producer { state: b })
     }
+}
 
-    fn get_stub_unsafe(&mut self) -> *mut Node<T> {
-        to_mut_unsafe_ptr(&mut self.stub)
+impl<T> Node<T> {
+    unsafe fn new(v: Option<T>) -> *mut Node<T> {
+        cast::transmute(~Node {
+            next: AtomicPtr::new(0 as *mut Node<T>),
+            value: v,
+        })
     }
+}
 
-    fn push(&mut self, value: T) {
-        unsafe {
-            let node = cast::transmute(~Node::with_value(value));
-            self.push_node(node);
+impl<T: Send, P: Send> State<T, P> {
+    pub unsafe fn new(p: P) -> State<T, P> {
+        let stub = Node::new(None);
+        State {
+            head: AtomicPtr::new(stub),
+            tail: stub,
+            packet: p,
         }
     }
 
-    fn push_node(&mut self, node: *mut Node<T>) {
-        unsafe {
-            (*node).next.store(mut_null(), Release);
-            let prev = self.head.swap(node, Relaxed);
-            (*prev).next.store(node, Release);
-        }
+    unsafe fn push(&mut self, t: T) {
+        let n = Node::new(Some(t));
+        let prev = self.head.swap(n, AcqRel);
+        (*prev).next.store(n, Release);
     }
 
-    fn pop(&mut self) -> Option<T> {
-        unsafe {
-            let mut tail = self.tail;
-            let mut next = (*tail).next.load(Acquire);
-            let stub = self.get_stub_unsafe();
-            if tail == stub {
-                if mut_null() == next {
-                    return None
-                }
-                self.tail = next;
-                tail = next;
-                next = (*next).next.load(Acquire);
-            }
-            if next != mut_null() {
-                let tail: ~Node<T> = cast::transmute(tail);
-                self.tail = next;
-                return tail.value
-            }
-            let head = self.head.load(Relaxed);
-            if tail != head {
-                return None
-            }
-            self.push_node(stub);
-            next = (*tail).next.load(Acquire);
-            if next != mut_null() {
-                let tail: ~Node<T> = cast::transmute(tail);
-                self.tail = next;
-                return tail.value
-            }
+    unsafe fn pop(&mut self) -> PopResult<T> {
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+
+        if !next.is_null() {
+            self.tail = next;
+            assert!((*tail).value.is_none());
+            assert!((*next).value.is_some());
+            let ret = (*next).value.take_unwrap();
+            let _: ~Node<T> = cast::transmute(tail);
+            return Data(ret);
         }
-        None
+
+        if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
+    }
+
+    unsafe fn is_empty(&mut self) -> bool {
+        return (*self.tail).next.load(Acquire).is_null();
     }
 }
 
-impl<T: Send> Queue<T> {
-    pub fn new() -> Queue<T> {
+#[unsafe_destructor]
+impl<T: Send, P: Send> Drop for State<T, P> {
+    fn drop(&mut self) {
         unsafe {
-            let q = Queue{state: UnsafeArc::new(State::new())};
-            (*q.state.get()).init();
-            q
+            let mut cur = self.tail;
+            while !cur.is_null() {
+                let next = (*cur).next.load(Relaxed);
+                let _: ~Node<T> = cast::transmute(cur);
+                cur = next;
+            }
         }
     }
+}
 
+impl<T: Send, P: Send> Producer<T, P> {
     pub fn push(&mut self, value: T) {
         unsafe { (*self.state.get()).push(value) }
     }
+    pub fn is_empty(&self) -> bool {
+        unsafe{ (*self.state.get()).is_empty() }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
 
-    pub fn pop(&mut self) -> Option<T> {
-        unsafe{ (*self.state.get()).pop() }
+impl<T: Send, P: Send> Consumer<T, P> {
+    pub fn pop(&mut self) -> PopResult<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+    pub fn casual_pop(&mut self) -> Option<T> {
+        match self.pop() {
+            Data(t) => Some(t),
+            Empty | Inconsistent => None,
+        }
+    }
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
     }
 }
 
 #[cfg(test)]
 mod tests {
     use prelude::*;
-    use option::*;
+
     use task;
-    use comm;
-    use super::Queue;
+    use super::{queue, Data, Empty, Inconsistent};
+
+    #[test]
+    fn test_full() {
+        let (_, mut p) = queue(());
+        p.push(~1);
+        p.push(~2);
+    }
 
     #[test]
     fn test() {
         let nthreads = 8u;
         let nmsgs = 1000u;
-        let mut q = Queue::new();
-        assert_eq!(None, q.pop());
+        let (mut c, p) = queue(());
+        match c.pop() {
+            Empty => {}
+            Inconsistent | Data(..) => fail!()
+        }
 
         for _ in range(0, nthreads) {
-            let (port, chan)  = comm::stream();
-            chan.send(q.clone());
+            let q = p.clone();
             do task::spawn_sched(task::SingleThreaded) {
-                let mut q = port.recv();
+                let mut q = q;
                 for i in range(0, nmsgs) {
                     q.push(i);
                 }
@@ -191,13 +204,10 @@ mod tests {
         }
 
         let mut i = 0u;
-        loop {
-            match q.pop() {
-                None => {},
-                Some(_) => {
-                    i += 1;
-                    if i == nthreads*nmsgs { break }
-                }
+        while i < nthreads * nmsgs {
+            match c.pop() {
+                Empty | Inconsistent => {},
+                Data(_) => { i += 1 }
             }
         }
     }