diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2014-01-06 15:23:37 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2014-02-11 16:32:00 -0800 |
| commit | 47ef20014c32443b12a122c0371a87f513830807 (patch) | |
| tree | f31f158b6c0c0718a8b54910746eef0f24290aa3 /src/libstd/sync | |
| parent | 0ac6e5afda2a9741d50d6b5c557ee16fee44878f (diff) | |
| download | rust-47ef20014c32443b12a122c0371a87f513830807.tar.gz rust-47ef20014c32443b12a122c0371a87f513830807.zip | |
Shuffle around ownership in concurrent queues
Beforehand, using a concurrent queue always mandated that the "shared state" be stored internally to the queues in order to provide a safe interface. This isn't quite as flexible as one would want in some circumstances, so instead this commit moves the queues to not containing the shared state. The queues no longer have a "default useful safe" interface, but rather a "default safe" interface (minus the useful part). The queues have to be shared manually through an Arc or some other means. This allows them to be a little more flexible at the cost of a usability hindrance. I plan on using this new flexibility to upgrade a channel to a shared channel seamlessly.
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/mpsc_queue.rs | 154 | ||||
| -rw-r--r-- | src/libstd/sync/spsc_queue.rs | 290 |
2 files changed, 177 insertions, 267 deletions
diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 1ec8ac5d83e..258162069d9 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node<T> { value: Option<T>, } -struct State<T, P> { - head: AtomicPtr<Node<T>>, - tail: *mut Node<T>, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -impl<T: Send, P: Send> Clone for Producer<T, P> { - fn clone(&self) -> Producer<T, P> { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue<T> { + priv head: AtomicPtr<Node<T>>, + priv tail: *mut Node<T>, } impl<T> Node<T> { @@ -110,67 +80,26 @@ impl<T> Node<T> { } } -impl<T: Send, P: Send> State<T, P> { - unsafe fn new(p: P) -> State<T, P> { - let stub = Node::new(None); - State { +impl<T: Send> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult<T> { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node<T> = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } -} - -#[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { - fn drop(&mut self) { + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node<T> = cast::transmute(cur); - cur = next; - } + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } } -} - -impl<T: Send, P: Send> Producer<T, P> { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} -impl<T: Send, P: Send> Consumer<T, P> { /// Pops some data from this queue. /// /// Note that the current implementation means that this function cannot @@ -182,8 +111,23 @@ impl<T: Send, P: Send> Consumer<T, P> { /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. pub fn pop(&mut self) -> PopResult<T> { - unsafe { (*self.state.get()).pop() } + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node<T> = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. pub fn casual_pop(&mut self) -> Option<T> { @@ -192,10 +136,19 @@ impl<T: Send, P: Send> Consumer<T, P> { Empty | Inconsistent => None, } } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +} + +#[unsafe_destructor] +impl<T: Send> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node<T> = cast::transmute(cur); + cur = next; + } + } } } @@ -203,12 +156,12 @@ impl<T: Send, P: Send> Consumer<T, P> { mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; + use super::{Queue, Data, Empty, Inconsistent}; use native; #[test] fn test_full() { - let (_, mut p) = queue(()); + let mut q = Queue::new(); p.push(~1); p.push(~2); } @@ -217,20 +170,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); + let mut q = Queue::new(); match c.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (port, chan) = SharedChan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); native::task::spawn(proc() { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); }); @@ -238,11 +191,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 35a5846f11a..d1fde759cc1 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node<T> { next: AtomicPtr<Node<T>>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State<T, P> { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue<T> { // consumer fields - tail: *mut Node<T>, // where to pop from - tail_prev: AtomicPtr<Node<T>>, // where to pop from + priv tail: *mut Node<T>, // where to pop from + priv tail_prev: AtomicPtr<Node<T>>, // where to pop from // producer fields - head: *mut Node<T>, // where to push to - first: *mut Node<T>, // where to get new nodes from - tail_copy: *mut Node<T>, // between first/tail + priv head: *mut Node<T>, // where to push to + priv first: *mut Node<T>, // where to get new nodes from + priv tail_copy: *mut Node<T>, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer<T, P> { - priv state: UnsafeArc<State<T, P>>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue<T: Send, P: Send>(bound: uint, - p: P) -> (Consumer<T, P>, Producer<T, P>) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl<T: Send> Node<T> { @@ -132,49 +81,49 @@ impl<T: Send> Node<T> { } } -impl<T: Send, P: Send> Producer<T, P> { - /// Pushes data onto the queue - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl<T: Send, P: Send> Consumer<T, P> { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +impl<T: Send> Queue<T> { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue<T> { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } } -} -impl<T: Send, P: Send> State<T, P> { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node<T>, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&mut self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node<T>, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node<T> { @@ -208,50 +157,57 @@ impl<T: Send, P: Send> State<T, P> { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option<T> { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option<T> { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node<T> = cast::transmute(tail); + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node<T> = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Tests whether this queue is empty or not. Remember that there can only + /// be one tester/popper, and also keep in mind that the answer returned + /// from this is likely to change if it is `false`. + pub fn is_empty(&self) -> bool { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } } } #[unsafe_destructor] -impl<T: Send, P: Send> Drop for State<T, P> { +impl<T: Send> Drop for Queue<T> { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +223,44 @@ impl<T: Send, P: Send> Drop for State<T, P> { #[cfg(test)] mod test { use prelude::*; - use super::queue; + use super::Queue; use native; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +269,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +285,7 @@ mod test { chan.send(()); }); for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); } |
