diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-05 17:56:17 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 17:47:11 -0800 |
| commit | bfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (patch) | |
| tree | b10aeff181eff3a8654df495d2ad8826490f6533 /src/libstd/rt/mpsc_queue.rs | |
| parent | 000cda611f8224ac780fa37432f869f425cd2bb7 (diff) | |
| download | rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.tar.gz rust-bfa9064ba2687eb1d95708f72f41ddd9729a6ba1.zip | |
Rewrite std::comm
* Streams are now ~3x faster than before (fewer allocations and more optimized)
* Based on a single-producer single-consumer lock-free queue that doesn't
always have to allocate on every send.
* Blocking via mutexes/cond vars outside the runtime
* Streams work in/out of the runtime seamlessly
* Select now works in/out of the runtime seamlessly
* Streams will now fail!() on send() if the other end has hung up
* try_send() will not fail
* PortOne/ChanOne removed
* SharedPort removed
* MegaPipe removed
* Generic select removed (only one kind of port now)
* API redesign
* try_recv == never block
* recv_opt == block, don't fail
* iter() == Iterator<T> for Port<T>
* removed peek
* Type::new
* Removed rt::comm
Diffstat (limited to 'src/libstd/rt/mpsc_queue.rs')
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 230 |
1 files changed, 120 insertions, 110 deletions
diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 4f39a1df4fa..d575028af70 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -1,5 +1,4 @@ -/* Multi-producer/single-consumer queue - * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -27,163 +26,177 @@ */ //! A mostly lock-free multi-producer, single consumer queue. -// http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -use unstable::sync::UnsafeArc; -use unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; -use ptr::{mut_null, to_mut_unsafe_ptr}; +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + use cast; -use option::*; use clone::Clone; use kinds::Send; +use ops::Drop; +use option::{Option, None, Some}; +use unstable::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; +use unstable::sync::UnsafeArc; + +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} struct Node<T> { next: AtomicPtr<Node<T>>, value: Option<T>, } -impl<T> Node<T> { - fn empty() -> Node<T> { - Node{next: AtomicPtr::new(mut_null()), value: None} - } - - fn with_value(value: T) -> Node<T> { - Node{next: AtomicPtr::new(mut_null()), value: Some(value)} - } -} - -struct State<T> { - pad0: [u8, ..64], +struct State<T, P> { head: AtomicPtr<Node<T>>, - pad1: [u8, ..64], - stub: Node<T>, - pad2: [u8, ..64], tail: *mut Node<T>, - pad3: [u8, ..64], + packet: P, } -struct Queue<T> { - priv state: UnsafeArc<State<T>>, +pub struct Consumer<T, P> { + priv state: UnsafeArc<State<T, P>>, } -impl<T: Send> Clone for Queue<T> { - fn clone(&self) -> Queue<T> { - Queue { - state: self.state.clone() - } - } +pub struct Producer<T, P> { + priv state: UnsafeArc<State<T, P>>, } -impl<T: Send> State<T> { - pub fn new() -> State<T> { - State{ - pad0: [0, ..64], - head: AtomicPtr::new(mut_null()), - pad1: [0, ..64], - stub: Node::<T>::empty(), - pad2: [0, ..64], - tail: mut_null(), - pad3: [0, ..64], - } +impl<T: Send, P: Send> Clone for Producer<T, P> { + fn clone(&self) -> Producer<T, P> { + Producer { state: self.state.clone() } } +} - fn init(&mut self) { - let stub = self.get_stub_unsafe(); - self.head.store(stub, Relaxed); - self.tail = stub; +pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) { + unsafe { + let (a, b) = UnsafeArc::new2(State::new(p)); + (Consumer { state: a }, Producer { state: b }) } +} - fn get_stub_unsafe(&mut self) -> *mut Node<T> { - to_mut_unsafe_ptr(&mut self.stub) +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + cast::transmute(~Node { + next: AtomicPtr::new(0 as *mut Node<T>), + value: v, + }) } +} - fn push(&mut self, value: T) { - unsafe { - let node = cast::transmute(~Node::with_value(value)); - self.push_node(node); +impl<T: Send, P: Send> State<T, P> { + pub unsafe fn new(p: P) -> State<T, P> { + let stub = Node::new(None); + State { + head: AtomicPtr::new(stub), + tail: stub, + packet: p, } } - fn push_node(&mut self, node: *mut Node<T>) { - unsafe { - (*node).next.store(mut_null(), Release); - let prev = self.head.swap(node, Relaxed); - (*prev).next.store(node, Release); - } + unsafe fn push(&mut self, t: T) { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } - fn pop(&mut self) -> Option<T> { - unsafe { - let mut tail = self.tail; - let mut next = (*tail).next.load(Acquire); - let stub = self.get_stub_unsafe(); - if tail == stub { - if mut_null() == next { - return None - } - self.tail = next; - tail = next; - next = (*next).next.load(Acquire); - } - if next != mut_null() { - let tail: ~Node<T> = cast::transmute(tail); - self.tail = next; - return tail.value - } - let head = self.head.load(Relaxed); - if tail != head { - return None - } - self.push_node(stub); - next = (*tail).next.load(Acquire); - if next != mut_null() { - let tail: ~Node<T> = cast::transmute(tail); - self.tail = next; - return tail.value - } + unsafe fn pop(&mut self) -> PopResult<T> { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node<T> = cast::transmute(tail); + return Data(ret); } - None + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } + + unsafe fn is_empty(&mut self) -> bool { + return (*self.tail).next.load(Acquire).is_null(); } } -impl<T: Send> Queue<T> { - pub fn new() -> Queue<T> { +#[unsafe_destructor] +impl<T: Send, P: Send> Drop for State<T, P> { + fn drop(&mut self) { unsafe { - let q = Queue{state: UnsafeArc::new(State::new())}; - (*q.state.get()).init(); - q + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node<T> = cast::transmute(cur); + cur = next; + } } } +} +impl<T: Send, P: Send> Producer<T, P> { pub fn push(&mut self, value: T) { unsafe { (*self.state.get()).push(value) } } + pub fn is_empty(&self) -> bool { + unsafe{ (*self.state.get()).is_empty() } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P + } +} - pub fn pop(&mut self) -> Option<T> { - unsafe{ (*self.state.get()).pop() } +impl<T: Send, P: Send> Consumer<T, P> { + pub fn pop(&mut self) -> PopResult<T> { + unsafe { (*self.state.get()).pop() } + } + pub fn casual_pop(&mut self) -> Option<T> { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } + } + pub unsafe fn packet(&self) -> *mut P { + &mut (*self.state.get()).packet as *mut P } } #[cfg(test)] mod tests { use prelude::*; - use option::*; + use task; - use comm; - use super::Queue; + use super::{queue, Data, Empty, Inconsistent}; + + #[test] + fn test_full() { + let (_, mut p) = queue(()); + p.push(~1); + p.push(~2); + } #[test] fn test() { let nthreads = 8u; let nmsgs = 1000u; - let mut q = Queue::new(); - assert_eq!(None, q.pop()); + let (mut c, p) = queue(()); + match c.pop() { + Empty => {} + Inconsistent | Data(..) => fail!() + } for _ in range(0, nthreads) { - let (port, chan) = comm::stream(); - chan.send(q.clone()); + let q = p.clone(); do task::spawn_sched(task::SingleThreaded) { - let mut q = port.recv(); + let mut q = q; for i in range(0, nmsgs) { q.push(i); } @@ -191,13 +204,10 @@ mod tests { } let mut i = 0u; - loop { - match q.pop() { - None => {}, - Some(_) => { - i += 1; - if i == nthreads*nmsgs { break } - } + while i < nthreads * nmsgs { + match c.pop() { + Empty | Inconsistent => {}, + Data(_) => { i += 1 } } } } |
