about summary refs log tree commit diff
path: root/src/libstd/sync/spsc_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync/spsc_queue.rs')
-rw-r--r--src/libstd/sync/spsc_queue.rs290
1 files changed, 123 insertions, 167 deletions
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
index 35a5846f11a..d1fde759cc1 100644
--- a/src/libstd/sync/spsc_queue.rs
+++ b/src/libstd/sync/spsc_queue.rs
@@ -38,7 +38,6 @@ use kinds::Send;
 use ops::Drop;
 use option::{Some, None, Option};
 use ptr::RawPtr;
-use sync::arc::UnsafeArc;
 use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
 
 // Node within the linked list queue of messages to send
@@ -50,75 +49,25 @@ struct Node<T> {
     next: AtomicPtr<Node<T>>,   // next node in the queue
 }
 
-// The producer/consumer halves both need access to the `tail` field, and if
-// they both have access to that we may as well just give them both access
-// to this whole structure.
-struct State<T, P> {
+/// The single-producer single-consumer queue. This structure is not cloneable,
+/// but it can be safely shared in an UnsafeArc if it is guaranteed that there
+/// is only one popper and one pusher touching the queue at any one point in
+/// time.
+pub struct Queue<T> {
     // consumer fields
-    tail: *mut Node<T>, // where to pop from
-    tail_prev: AtomicPtr<Node<T>>, // where to pop from
+    priv tail: *mut Node<T>, // where to pop from
+    priv tail_prev: AtomicPtr<Node<T>>, // where to pop from
 
     // producer fields
-    head: *mut Node<T>,      // where to push to
-    first: *mut Node<T>,     // where to get new nodes from
-    tail_copy: *mut Node<T>, // between first/tail
+    priv head: *mut Node<T>,      // where to push to
+    priv first: *mut Node<T>,     // where to get new nodes from
+    priv tail_copy: *mut Node<T>, // between first/tail
 
     // Cache maintenance fields. Additions and subtractions are stored
     // separately in order to allow them to use nonatomic addition/subtraction.
-    cache_bound: uint,
-    cache_additions: AtomicUint,
-    cache_subtractions: AtomicUint,
-
-    packet: P,
-}
-
-/// Producer half of this queue. This handle is used to push data to the
-/// consumer.
-pub struct Producer<T, P> {
-    priv state: UnsafeArc<State<T, P>>,
-}
-
-/// Consumer half of this queue. This handle is used to receive data from the
-/// producer.
-pub struct Consumer<T, P> {
-    priv state: UnsafeArc<State<T, P>>,
-}
-
-/// Creates a new queue. The producer returned is connected to the consumer to
-/// push all data to the consumer.
-///
-/// # Arguments
-///
-///   * `bound` - This queue implementation is implemented with a linked list,
-///               and this means that a push is always a malloc. In order to
-///               amortize this cost, an internal cache of nodes is maintained
-///               to prevent a malloc from always being necessary. This bound is
-///               the limit on the size of the cache (if desired). If the value
-///               is 0, then the cache has no bound. Otherwise, the cache will
-///               never grow larger than `bound` (although the queue itself
-///               could be much larger.
-///
-///   * `p` - This is the user-defined packet of data which will also be shared
-///           between the producer and consumer.
-pub fn queue<T: Send, P: Send>(bound: uint,
-                               p: P) -> (Consumer<T, P>, Producer<T, P>)
-{
-    let n1 = Node::new();
-    let n2 = Node::new();
-    unsafe { (*n1).next.store(n2, Relaxed) }
-    let state = State {
-        tail: n2,
-        tail_prev: AtomicPtr::new(n1),
-        head: n2,
-        first: n1,
-        tail_copy: n1,
-        cache_bound: bound,
-        cache_additions: AtomicUint::new(0),
-        cache_subtractions: AtomicUint::new(0),
-        packet: p,
-    };
-    let (arc1, arc2) = UnsafeArc::new2(state);
-    (Consumer { state: arc1 }, Producer { state: arc2 })
+    priv cache_bound: uint,
+    priv cache_additions: AtomicUint,
+    priv cache_subtractions: AtomicUint,
 }
 
 impl<T: Send> Node<T> {
@@ -132,49 +81,49 @@ impl<T: Send> Node<T> {
     }
 }
 
-impl<T: Send, P: Send> Producer<T, P> {
-    /// Pushes data onto the queue
-    pub fn push(&mut self, t: T) {
-        unsafe { (*self.state.get()).push(t) }
-    }
-    /// Tests whether the queue is empty. Note that if this function returns
-    /// `false`, the return value is significant, but if the return value is
-    /// `true` then almost no meaning can be attached to the return value.
-    pub fn is_empty(&self) -> bool {
-        unsafe { (*self.state.get()).is_empty() }
-    }
-    /// Acquires an unsafe pointer to the underlying user-defined packet. Note
-    /// that care must be taken to ensure that the queue outlives the usage of
-    /// the packet (because it is an unsafe pointer).
-    pub unsafe fn packet(&self) -> *mut P {
-        &mut (*self.state.get()).packet as *mut P
-    }
-}
-
-impl<T: Send, P: Send> Consumer<T, P> {
-    /// Pops some data from this queue, returning `None` when the queue is
-    /// empty.
-    pub fn pop(&mut self) -> Option<T> {
-        unsafe { (*self.state.get()).pop() }
-    }
-    /// Same function as the producer's `packet` method.
-    pub unsafe fn packet(&self) -> *mut P {
-        &mut (*self.state.get()).packet as *mut P
+impl<T: Send> Queue<T> {
+    /// Creates a new queue. The producer returned is connected to the consumer
+    /// to push all data to the consumer.
+    ///
+    /// # Arguments
+    ///
+    ///   * `bound` - This queue implementation is implemented with a linked
+    ///               list, and this means that a push is always a malloc. In
+    ///               order to amortize this cost, an internal cache of nodes is
+    ///               maintained to prevent a malloc from always being
+    ///               necessary. This bound is the limit on the size of the
+    ///               cache (if desired). If the value is 0, then the cache has
+    ///               no bound. Otherwise, the cache will never grow larger than
+    ///               `bound` (although the queue itself could be much larger.
+    pub fn new(bound: uint) -> Queue<T> {
+        let n1 = Node::new();
+        let n2 = Node::new();
+        unsafe { (*n1).next.store(n2, Relaxed) }
+        Queue {
+            tail: n2,
+            tail_prev: AtomicPtr::new(n1),
+            head: n2,
+            first: n1,
+            tail_copy: n1,
+            cache_bound: bound,
+            cache_additions: AtomicUint::new(0),
+            cache_subtractions: AtomicUint::new(0),
+        }
     }
-}
 
-impl<T: Send, P: Send> State<T, P> {
-    // remember that there is only one thread executing `push` (and only one
-    // thread executing `pop`)
-    unsafe fn push(&mut self, t: T) {
-        // Acquire a node (which either uses a cached one or allocates a new
-        // one), and then append this to the 'head' node.
-        let n = self.alloc();
-        assert!((*n).value.is_none());
-        (*n).value = Some(t);
-        (*n).next.store(0 as *mut Node<T>, Relaxed);
-        (*self.head).next.store(n, Release);
-        self.head = n;
+    /// Pushes a new value onto this queue. Note that to use this function
+    /// safely, it must be externally guaranteed that there is only one pusher.
+    pub fn push(&mut self, t: T) {
+        unsafe {
+            // Acquire a node (which either uses a cached one or allocates a new
+            // one), and then append this to the 'head' node.
+            let n = self.alloc();
+            assert!((*n).value.is_none());
+            (*n).value = Some(t);
+            (*n).next.store(0 as *mut Node<T>, Relaxed);
+            (*self.head).next.store(n, Release);
+            self.head = n;
+        }
     }
 
     unsafe fn alloc(&mut self) -> *mut Node<T> {
@@ -208,50 +157,57 @@ impl<T: Send, P: Send> State<T, P> {
         Node::new()
     }
 
-    // remember that there is only one thread executing `pop` (and only one
-    // thread executing `push`)
-    unsafe fn pop(&mut self) -> Option<T> {
-        // The `tail` node is not actually a used node, but rather a
-        // sentinel from where we should start popping from. Hence, look at
-        // tail's next field and see if we can use it. If we do a pop, then
-        // the current tail node is a candidate for going into the cache.
-        let tail = self.tail;
-        let next = (*tail).next.load(Acquire);
-        if next.is_null() { return None }
-        assert!((*next).value.is_some());
-        let ret = (*next).value.take();
-
-        self.tail = next;
-        if self.cache_bound == 0 {
-            self.tail_prev.store(tail, Release);
-        } else {
-            // FIXME: this is dubious with overflow.
-            let additions = self.cache_additions.load(Relaxed);
-            let subtractions = self.cache_subtractions.load(Relaxed);
-            let size = additions - subtractions;
+    /// Attempts to pop a value from this queue. Remember that to use this type
+    /// safely you must ensure that there is only one popper at a time.
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe {
+            // The `tail` node is not actually a used node, but rather a
+            // sentinel from where we should start popping from. Hence, look at
+            // tail's next field and see if we can use it. If we do a pop, then
+            // the current tail node is a candidate for going into the cache.
+            let tail = self.tail;
+            let next = (*tail).next.load(Acquire);
+            if next.is_null() { return None }
+            assert!((*next).value.is_some());
+            let ret = (*next).value.take();
 
-            if size < self.cache_bound {
+            self.tail = next;
+            if self.cache_bound == 0 {
                 self.tail_prev.store(tail, Release);
-                self.cache_additions.store(additions + 1, Relaxed);
             } else {
-                (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
-                // We have successfully erased all references to 'tail', so
-                // now we can safely drop it.
-                let _: ~Node<T> = cast::transmute(tail);
+                // FIXME: this is dubious with overflow.
+                let additions = self.cache_additions.load(Relaxed);
+                let subtractions = self.cache_subtractions.load(Relaxed);
+                let size = additions - subtractions;
+
+                if size < self.cache_bound {
+                    self.tail_prev.store(tail, Release);
+                    self.cache_additions.store(additions + 1, Relaxed);
+                } else {
+                    (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
+                    // We have successfully erased all references to 'tail', so
+                    // now we can safely drop it.
+                    let _: ~Node<T> = cast::transmute(tail);
+                }
             }
+            return ret;
         }
-        return ret;
     }
 
-    unsafe fn is_empty(&self) -> bool {
-        let tail = self.tail;
-        let next = (*tail).next.load(Acquire);
-        return next.is_null();
+    /// Tests whether this queue is empty or not. Remember that there can only
+    /// be one tester/popper, and also keep in mind that the answer returned
+    /// from this is likely to change if it is `false`.
+    pub fn is_empty(&self) -> bool {
+        unsafe {
+            let tail = self.tail;
+            let next = (*tail).next.load(Acquire);
+            return next.is_null();
+        }
     }
 }
 
 #[unsafe_destructor]
-impl<T: Send, P: Send> Drop for State<T, P> {
+impl<T: Send> Drop for Queue<T> {
     fn drop(&mut self) {
         unsafe {
             let mut cur = self.first;
@@ -267,44 +223,44 @@ impl<T: Send, P: Send> Drop for State<T, P> {
 #[cfg(test)]
 mod test {
     use prelude::*;
-    use super::queue;
+    use super::Queue;
     use native;
 
     #[test]
     fn smoke() {
-        let (mut c, mut p) = queue(0, ());
-        p.push(1);
-        p.push(2);
-        assert_eq!(c.pop(), Some(1));
-        assert_eq!(c.pop(), Some(2));
-        assert_eq!(c.pop(), None);
-        p.push(3);
-        p.push(4);
-        assert_eq!(c.pop(), Some(3));
-        assert_eq!(c.pop(), Some(4));
-        assert_eq!(c.pop(), None);
+        let mut q = Queue::new(0);
+        q.push(1);
+        q.push(2);
+        assert_eq!(q.pop(), Some(1));
+        assert_eq!(q.pop(), Some(2));
+        assert_eq!(q.pop(), None);
+        q.push(3);
+        q.push(4);
+        assert_eq!(q.pop(), Some(3));
+        assert_eq!(q.pop(), Some(4));
+        assert_eq!(q.pop(), None);
     }
 
     #[test]
     fn drop_full() {
-        let (_, mut p) = queue(0, ());
-        p.push(~1);
-        p.push(~2);
+        let mut q = Queue::new(0);
+        q.push(~1);
+        q.push(~2);
     }
 
     #[test]
     fn smoke_bound() {
-        let (mut c, mut p) = queue(1, ());
-        p.push(1);
-        p.push(2);
-        assert_eq!(c.pop(), Some(1));
-        assert_eq!(c.pop(), Some(2));
-        assert_eq!(c.pop(), None);
-        p.push(3);
-        p.push(4);
-        assert_eq!(c.pop(), Some(3));
-        assert_eq!(c.pop(), Some(4));
-        assert_eq!(c.pop(), None);
+        let mut q = Queue::new(1);
+        q.push(1);
+        q.push(2);
+        assert_eq!(q.pop(), Some(1));
+        assert_eq!(q.pop(), Some(2));
+        assert_eq!(q.pop(), None);
+        q.push(3);
+        q.push(4);
+        assert_eq!(q.pop(), Some(3));
+        assert_eq!(q.pop(), Some(4));
+        assert_eq!(q.pop(), None);
     }
 
     #[test]
@@ -313,13 +269,13 @@ mod test {
         stress_bound(1);
 
         fn stress_bound(bound: uint) {
-            let (c, mut p) = queue(bound, ());
+            let (a, b) = UnsafeArc::new2(Queue::new(bound));
             let (port, chan) = Chan::new();
             native::task::spawn(proc() {
                 let mut c = c;
                 for _ in range(0, 100000) {
                     loop {
-                        match c.pop() {
+                        match unsafe { (*b.get()).pop() } {
                             Some(1) => break,
                             Some(_) => fail!(),
                             None => {}
@@ -329,7 +285,7 @@ mod test {
                 chan.send(());
             });
             for _ in range(0, 100000) {
-                p.push(1);
+                unsafe { (*a.get()).push(1); }
             }
             port.recv();
         }