diff options
Diffstat (limited to 'src/libstd/sync/spsc_queue.rs')
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 290 |
1 files changed, 123 insertions, 167 deletions
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 35a5846f11a..d1fde759cc1 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node<T> { next: AtomicPtr<Node<T>>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State<T, P> { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue<T> { // consumer fields - tail: *mut Node<T>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from + priv tail: *mut Node<T>, // where to pop from + priv tail_prev: AtomicPtr<Node<T>>, // where to pop from // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail + priv head: *mut Node<T>, // where to push to + priv first: *mut Node<T>, // where to get new nodes from + priv tail_copy: *mut Node<T>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue<T: Send, P: Send>(bound: uint, - p: P) -> (Consumer<T, P>, Producer<T, P>) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl<T: Send> Node<T> { @@ -132,49 +81,49 @@ impl<T: Send> Node<T> { } } -impl<T: Send, P: Send> Producer<T, P> { - /// Pushes data onto the queue - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +impl<T: Send> Queue<T> { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue<T> { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } } -} -impl<T: Send, P: Send> State<T, P> { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&mut self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node<T>, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node<T> { @@ -208,50 +157,57 @@ impl<T: Send, P: Send> State<T, P> { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option<T> { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option<T> { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node<T> = cast::transmute(tail); + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node<T> = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Tests whether this queue is empty or not. Remember that there can only + /// be one tester/popper, and also keep in mind that the answer returned + /// from this is likely to change if it is `false`. + pub fn is_empty(&self) -> bool { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } } } #[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { +impl<T: Send> Drop for Queue<T> { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +223,44 @@ impl<T: Send, P: Send> Drop for State<T, P> { #[cfg(test)] mod test { use prelude::*; - use super::queue; + use super::Queue; use native; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +269,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +285,7 @@ mod test { chan.send(()); }); for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); } |
