about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/arc.rs153
-rw-r--r--src/libstd/sync/atomics.rs603
-rw-r--r--src/libstd/sync/deque.rs661
-rw-r--r--src/libstd/sync/mod.rs23
-rw-r--r--src/libstd/sync/mpmc_bounded_queue.rs211
-rw-r--r--src/libstd/sync/mpsc_queue.rs245
-rw-r--r--src/libstd/sync/spsc_queue.rs334
7 files changed, 2230 insertions, 0 deletions
diff --git a/src/libstd/sync/arc.rs b/src/libstd/sync/arc.rs
new file mode 100644
index 00000000000..7632ec6cf29
--- /dev/null
+++ b/src/libstd/sync/arc.rs
@@ -0,0 +1,153 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Atomically reference counted data
+//!
+//! This modules contains the implementation of an atomically reference counted
+//! pointer for the purpose of sharing data between tasks. This is obviously a
+//! very unsafe primitive to use, but it has its use cases when implementing
+//! concurrent data structures and similar tasks.
+//!
+//! Great care must be taken to ensure that data races do not arise through the
+//! usage of `UnsafeArc`, and this often requires some form of external
+//! synchronization. The only guarantee provided to you by this class is that
+//! the underlying data will remain valid (not free'd) so long as the reference
+//! count is greater than one.
+
+use cast;
+use clone::Clone;
+use kinds::Send;
+use ops::Drop;
+use ptr::RawPtr;
+use sync::atomics::{AtomicUint, SeqCst, Relaxed, Acquire};
+use vec;
+
+/// An atomically reference counted pointer.
+///
+/// Enforces no shared-memory safety.
+#[unsafe_no_drop_flag]
+pub struct UnsafeArc<T> {
+    priv data: *mut ArcData<T>,
+}
+
+struct ArcData<T> {
+    count: AtomicUint,
+    data: T,
+}
+
+unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
+    let data = ~ArcData { count: AtomicUint::new(refcount), data: data };
+    cast::transmute(data)
+}
+
+impl<T: Send> UnsafeArc<T> {
+    /// Creates a new `UnsafeArc` which wraps the given data.
+    pub fn new(data: T) -> UnsafeArc<T> {
+        unsafe { UnsafeArc { data: new_inner(data, 1) } }
+    }
+
+    /// As new(), but returns an extra pre-cloned handle.
+    pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) {
+        unsafe {
+            let ptr = new_inner(data, 2);
+            (UnsafeArc { data: ptr }, UnsafeArc { data: ptr })
+        }
+    }
+
+    /// As new(), but returns a vector of as many pre-cloned handles as
+    /// requested.
+    pub fn newN(data: T, num_handles: uint) -> ~[UnsafeArc<T>] {
+        unsafe {
+            if num_handles == 0 {
+                ~[] // need to free data here
+            } else {
+                let ptr = new_inner(data, num_handles);
+                vec::from_fn(num_handles, |_| UnsafeArc { data: ptr })
+            }
+        }
+    }
+
+    /// Gets a pointer to the inner shared data. Note that care must be taken to
+    /// ensure that the outer `UnsafeArc` does not fall out of scope while this
+    /// pointer is in use, otherwise it could possibly contain a use-after-free.
+    #[inline]
+    pub fn get(&self) -> *mut T {
+        unsafe {
+            assert!((*self.data).count.load(Relaxed) > 0);
+            return &mut (*self.data).data as *mut T;
+        }
+    }
+
+    /// Gets an immutable pointer to the inner shared data. This has the same
+    /// caveats as the `get` method.
+    #[inline]
+    pub fn get_immut(&self) -> *T {
+        unsafe {
+            assert!((*self.data).count.load(Relaxed) > 0);
+            return &(*self.data).data as *T;
+        }
+    }
+}
+
+impl<T: Send> Clone for UnsafeArc<T> {
+    fn clone(&self) -> UnsafeArc<T> {
+        unsafe {
+            // This barrier might be unnecessary, but I'm not sure...
+            let old_count = (*self.data).count.fetch_add(1, Acquire);
+            assert!(old_count >= 1);
+            return UnsafeArc { data: self.data };
+        }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for UnsafeArc<T>{
+    fn drop(&mut self) {
+        unsafe {
+            // Happens when destructing an unwrapper's handle and from
+            // `#[unsafe_no_drop_flag]`
+            if self.data.is_null() {
+                return
+            }
+            // Must be acquire+release, not just release, to make sure this
+            // doesn't get reordered to after the unwrapper pointer load.
+            let old_count = (*self.data).count.fetch_sub(1, SeqCst);
+            assert!(old_count >= 1);
+            if old_count == 1 {
+                let _: ~ArcData<T> = cast::transmute(self.data);
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+    use super::UnsafeArc;
+    use task;
+    use mem::size_of;
+
+    #[test]
+    fn test_size() {
+        assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>());
+    }
+
+    #[test]
+    fn arclike_newN() {
+        // Tests that the many-refcounts-at-once constructors don't leak.
+        let _ = UnsafeArc::new2(~~"hello");
+        let x = UnsafeArc::newN(~~"hello", 0);
+        assert_eq!(x.len(), 0)
+        let x = UnsafeArc::newN(~~"hello", 1);
+        assert_eq!(x.len(), 1)
+        let x = UnsafeArc::newN(~~"hello", 10);
+        assert_eq!(x.len(), 10)
+    }
+}
diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs
new file mode 100644
index 00000000000..bc9d99c0f37
--- /dev/null
+++ b/src/libstd/sync/atomics.rs
@@ -0,0 +1,603 @@
+// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+ * Atomic types
+ *
+ * Basic atomic types supporting atomic operations. Each method takes an
+ * `Ordering` which represents the strength of the memory barrier for that
+ * operation. These orderings are the same as C++11 atomic orderings
+ * [http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync]
+ *
+ * All atomic types are a single word in size.
+ */
+
+#[allow(missing_doc)];
+
+use unstable::intrinsics;
+use cast;
+use option::{Option,Some,None};
+use libc::c_void;
+use ops::Drop;
+use util::NonCopyable;
+
+/**
+ * A simple atomic flag, that can be set and cleared. The most basic atomic type.
+ */
+pub struct AtomicFlag {
+    priv v: int,
+    priv nocopy: NonCopyable
+}
+
+/**
+ * An atomic boolean type.
+ */
+pub struct AtomicBool {
+    priv v: uint,
+    priv nocopy: NonCopyable
+}
+
+/**
+ * A signed atomic integer type, supporting basic atomic arithmetic operations
+ */
+pub struct AtomicInt {
+    priv v: int,
+    priv nocopy: NonCopyable
+}
+
+/**
+ * An unsigned atomic integer type, supporting basic atomic arithmetic operations
+ */
+pub struct AtomicUint {
+    priv v: uint,
+    priv nocopy: NonCopyable
+}
+
+/**
+ * An unsafe atomic pointer. Only supports basic atomic operations
+ */
+pub struct AtomicPtr<T> {
+    priv p: *mut T,
+    priv nocopy: NonCopyable
+}
+
+/**
+ * An owned atomic pointer. Ensures that only a single reference to the data is held at any time.
+ */
+#[unsafe_no_drop_flag]
+pub struct AtomicOption<T> {
+    priv p: *mut c_void
+}
+
+pub enum Ordering {
+    Relaxed,
+    Release,
+    Acquire,
+    AcqRel,
+    SeqCst
+}
+
+pub static INIT_ATOMIC_FLAG : AtomicFlag = AtomicFlag { v: 0, nocopy: NonCopyable };
+pub static INIT_ATOMIC_BOOL : AtomicBool = AtomicBool { v: 0, nocopy: NonCopyable };
+pub static INIT_ATOMIC_INT  : AtomicInt  = AtomicInt  { v: 0, nocopy: NonCopyable };
+pub static INIT_ATOMIC_UINT : AtomicUint = AtomicUint { v: 0, nocopy: NonCopyable };
+
+impl AtomicFlag {
+
+    pub fn new() -> AtomicFlag {
+        AtomicFlag { v: 0, nocopy: NonCopyable }
+    }
+
+    /**
+     * Clears the atomic flag
+     */
+    #[inline]
+    pub fn clear(&mut self, order: Ordering) {
+        unsafe {atomic_store(&mut self.v, 0, order)}
+    }
+
+    /**
+     * Sets the flag if it was previously unset, returns the previous value of the
+     * flag.
+     */
+    #[inline]
+    pub fn test_and_set(&mut self, order: Ordering) -> bool {
+        unsafe { atomic_compare_and_swap(&mut self.v, 0, 1, order) > 0 }
+    }
+}
+
+impl AtomicBool {
+    pub fn new(v: bool) -> AtomicBool {
+        AtomicBool { v: if v { 1 } else { 0 }, nocopy: NonCopyable }
+    }
+
+    #[inline]
+    pub fn load(&self, order: Ordering) -> bool {
+        unsafe { atomic_load(&self.v, order) > 0 }
+    }
+
+    #[inline]
+    pub fn store(&mut self, val: bool, order: Ordering) {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_store(&mut self.v, val, order); }
+    }
+
+    #[inline]
+    pub fn swap(&mut self, val: bool, order: Ordering) -> bool {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_swap(&mut self.v, val, order) > 0 }
+    }
+
+    #[inline]
+    pub fn compare_and_swap(&mut self, old: bool, new: bool, order: Ordering) -> bool {
+        let old = if old { 1 } else { 0 };
+        let new = if new { 1 } else { 0 };
+
+        unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) > 0 }
+    }
+
+    /// Returns the old value
+    #[inline]
+    pub fn fetch_and(&mut self, val: bool, order: Ordering) -> bool {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_and(&mut self.v, val, order) > 0 }
+    }
+
+    /// Returns the old value
+    #[inline]
+    pub fn fetch_nand(&mut self, val: bool, order: Ordering) -> bool {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_nand(&mut self.v, val, order) > 0 }
+    }
+
+    /// Returns the old value
+    #[inline]
+    pub fn fetch_or(&mut self, val: bool, order: Ordering) -> bool {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_or(&mut self.v, val, order) > 0 }
+    }
+
+    /// Returns the old value
+    #[inline]
+    pub fn fetch_xor(&mut self, val: bool, order: Ordering) -> bool {
+        let val = if val { 1 } else { 0 };
+
+        unsafe { atomic_xor(&mut self.v, val, order) > 0 }
+    }
+}
+
+impl AtomicInt {
+    pub fn new(v: int) -> AtomicInt {
+        AtomicInt { v:v, nocopy: NonCopyable }
+    }
+
+    #[inline]
+    pub fn load(&self, order: Ordering) -> int {
+        unsafe { atomic_load(&self.v, order) }
+    }
+
+    #[inline]
+    pub fn store(&mut self, val: int, order: Ordering) {
+        unsafe { atomic_store(&mut self.v, val, order); }
+    }
+
+    #[inline]
+    pub fn swap(&mut self, val: int, order: Ordering) -> int {
+        unsafe { atomic_swap(&mut self.v, val, order) }
+    }
+
+    #[inline]
+    pub fn compare_and_swap(&mut self, old: int, new: int, order: Ordering) -> int {
+        unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) }
+    }
+
+    /// Returns the old value (like __sync_fetch_and_add).
+    #[inline]
+    pub fn fetch_add(&mut self, val: int, order: Ordering) -> int {
+        unsafe { atomic_add(&mut self.v, val, order) }
+    }
+
+    /// Returns the old value (like __sync_fetch_and_sub).
+    #[inline]
+    pub fn fetch_sub(&mut self, val: int, order: Ordering) -> int {
+        unsafe { atomic_sub(&mut self.v, val, order) }
+    }
+}
+
+impl AtomicUint {
+    pub fn new(v: uint) -> AtomicUint {
+        AtomicUint { v:v, nocopy: NonCopyable }
+    }
+
+    #[inline]
+    pub fn load(&self, order: Ordering) -> uint {
+        unsafe { atomic_load(&self.v, order) }
+    }
+
+    #[inline]
+    pub fn store(&mut self, val: uint, order: Ordering) {
+        unsafe { atomic_store(&mut self.v, val, order); }
+    }
+
+    #[inline]
+    pub fn swap(&mut self, val: uint, order: Ordering) -> uint {
+        unsafe { atomic_swap(&mut self.v, val, order) }
+    }
+
+    #[inline]
+    pub fn compare_and_swap(&mut self, old: uint, new: uint, order: Ordering) -> uint {
+        unsafe { atomic_compare_and_swap(&mut self.v, old, new, order) }
+    }
+
+    /// Returns the old value (like __sync_fetch_and_add).
+    #[inline]
+    pub fn fetch_add(&mut self, val: uint, order: Ordering) -> uint {
+        unsafe { atomic_add(&mut self.v, val, order) }
+    }
+
+    /// Returns the old value (like __sync_fetch_and_sub)..
+    #[inline]
+    pub fn fetch_sub(&mut self, val: uint, order: Ordering) -> uint {
+        unsafe { atomic_sub(&mut self.v, val, order) }
+    }
+}
+
+impl<T> AtomicPtr<T> {
+    pub fn new(p: *mut T) -> AtomicPtr<T> {
+        AtomicPtr { p:p, nocopy: NonCopyable }
+    }
+
+    #[inline]
+    pub fn load(&self, order: Ordering) -> *mut T {
+        unsafe { atomic_load(&self.p, order) }
+    }
+
+    #[inline]
+    pub fn store(&mut self, ptr: *mut T, order: Ordering) {
+        unsafe { atomic_store(&mut self.p, ptr, order); }
+    }
+
+    #[inline]
+    pub fn swap(&mut self, ptr: *mut T, order: Ordering) -> *mut T {
+        unsafe { atomic_swap(&mut self.p, ptr, order) }
+    }
+
+    #[inline]
+    pub fn compare_and_swap(&mut self, old: *mut T, new: *mut T, order: Ordering) -> *mut T {
+        unsafe { atomic_compare_and_swap(&mut self.p, old, new, order) }
+    }
+}
+
+impl<T> AtomicOption<T> {
+    pub fn new(p: ~T) -> AtomicOption<T> {
+        unsafe {
+            AtomicOption {
+                p: cast::transmute(p)
+            }
+        }
+    }
+
+    pub fn empty() -> AtomicOption<T> {
+        unsafe {
+            AtomicOption {
+                p: cast::transmute(0)
+            }
+        }
+    }
+
+    #[inline]
+    pub fn swap(&mut self, val: ~T, order: Ordering) -> Option<~T> {
+        unsafe {
+            let val = cast::transmute(val);
+
+            let p = atomic_swap(&mut self.p, val, order);
+            let pv : &uint = cast::transmute(&p);
+
+            if *pv == 0 {
+                None
+            } else {
+                Some(cast::transmute(p))
+            }
+        }
+    }
+
+    #[inline]
+    pub fn take(&mut self, order: Ordering) -> Option<~T> {
+        unsafe {
+            self.swap(cast::transmute(0), order)
+        }
+    }
+
+    /// A compare-and-swap. Succeeds if the option is 'None' and returns 'None'
+    /// if so. If the option was already 'Some', returns 'Some' of the rejected
+    /// value.
+    #[inline]
+    pub fn fill(&mut self, val: ~T, order: Ordering) -> Option<~T> {
+        unsafe {
+            let val = cast::transmute(val);
+            let expected = cast::transmute(0);
+            let oldval = atomic_compare_and_swap(&mut self.p, expected, val, order);
+            if oldval == expected {
+                None
+            } else {
+                Some(cast::transmute(val))
+            }
+        }
+    }
+
+    /// Be careful: The caller must have some external method of ensuring the
+    /// result does not get invalidated by another task after this returns.
+    #[inline]
+    pub fn is_empty(&mut self, order: Ordering) -> bool {
+        unsafe { atomic_load(&self.p, order) == cast::transmute(0) }
+    }
+}
+
+#[unsafe_destructor]
+impl<T> Drop for AtomicOption<T> {
+    fn drop(&mut self) {
+        let _ = self.take(SeqCst);
+    }
+}
+
+#[inline]
+pub unsafe fn atomic_store<T>(dst: &mut T, val: T, order:Ordering) {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    match order {
+        Release => intrinsics::atomic_store_rel(dst, val),
+        Relaxed => intrinsics::atomic_store_relaxed(dst, val),
+        _       => intrinsics::atomic_store(dst, val)
+    }
+}
+
+#[inline]
+pub unsafe fn atomic_load<T>(dst: &T, order:Ordering) -> T {
+    let dst = cast::transmute(dst);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_load_acq(dst),
+        Relaxed => intrinsics::atomic_load_relaxed(dst),
+        _       => intrinsics::atomic_load(dst)
+    })
+}
+
+#[inline]
+pub unsafe fn atomic_swap<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_xchg_acq(dst, val),
+        Release => intrinsics::atomic_xchg_rel(dst, val),
+        AcqRel  => intrinsics::atomic_xchg_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_xchg_relaxed(dst, val),
+        _       => intrinsics::atomic_xchg(dst, val)
+    })
+}
+
+/// Returns the old value (like __sync_fetch_and_add).
+#[inline]
+pub unsafe fn atomic_add<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_xadd_acq(dst, val),
+        Release => intrinsics::atomic_xadd_rel(dst, val),
+        AcqRel  => intrinsics::atomic_xadd_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_xadd_relaxed(dst, val),
+        _       => intrinsics::atomic_xadd(dst, val)
+    })
+}
+
+/// Returns the old value (like __sync_fetch_and_sub).
+#[inline]
+pub unsafe fn atomic_sub<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_xsub_acq(dst, val),
+        Release => intrinsics::atomic_xsub_rel(dst, val),
+        AcqRel  => intrinsics::atomic_xsub_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_xsub_relaxed(dst, val),
+        _       => intrinsics::atomic_xsub(dst, val)
+    })
+}
+
+#[inline]
+pub unsafe fn atomic_compare_and_swap<T>(dst:&mut T, old:T, new:T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let old = cast::transmute(old);
+    let new = cast::transmute(new);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_cxchg_acq(dst, old, new),
+        Release => intrinsics::atomic_cxchg_rel(dst, old, new),
+        AcqRel  => intrinsics::atomic_cxchg_acqrel(dst, old, new),
+        Relaxed => intrinsics::atomic_cxchg_relaxed(dst, old, new),
+        _       => intrinsics::atomic_cxchg(dst, old, new),
+    })
+}
+
+#[inline]
+pub unsafe fn atomic_and<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_and_acq(dst, val),
+        Release => intrinsics::atomic_and_rel(dst, val),
+        AcqRel  => intrinsics::atomic_and_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_and_relaxed(dst, val),
+        _       => intrinsics::atomic_and(dst, val)
+    })
+}
+
+
+#[inline]
+pub unsafe fn atomic_nand<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_nand_acq(dst, val),
+        Release => intrinsics::atomic_nand_rel(dst, val),
+        AcqRel  => intrinsics::atomic_nand_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_nand_relaxed(dst, val),
+        _       => intrinsics::atomic_nand(dst, val)
+    })
+}
+
+
+#[inline]
+pub unsafe fn atomic_or<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_or_acq(dst, val),
+        Release => intrinsics::atomic_or_rel(dst, val),
+        AcqRel  => intrinsics::atomic_or_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_or_relaxed(dst, val),
+        _       => intrinsics::atomic_or(dst, val)
+    })
+}
+
+
+#[inline]
+pub unsafe fn atomic_xor<T>(dst: &mut T, val: T, order: Ordering) -> T {
+    let dst = cast::transmute(dst);
+    let val = cast::transmute(val);
+
+    cast::transmute(match order {
+        Acquire => intrinsics::atomic_xor_acq(dst, val),
+        Release => intrinsics::atomic_xor_rel(dst, val),
+        AcqRel  => intrinsics::atomic_xor_acqrel(dst, val),
+        Relaxed => intrinsics::atomic_xor_relaxed(dst, val),
+        _       => intrinsics::atomic_xor(dst, val)
+    })
+}
+
+
+/**
+ * An atomic fence.
+ *
+ * A fence 'A' which has `Release` ordering semantics, synchronizes with a
+ * fence 'B' with (at least) `Acquire` semantics, if and only if there exists
+ * atomic operations X and Y, both operating on some atomic object 'M' such
+ * that A is sequenced before X, Y is synchronized before B and Y observers
+ * the change to M. This provides a happens-before dependence between A and B.
+ *
+ * Atomic operations with `Release` or `Acquire` semantics can also synchronize
+ * with a fence.
+ *
+ * A fence with has `SeqCst` ordering, in addition to having both `Acquire` and
+ * `Release` semantics, participates in the global program order of the other
+ * `SeqCst` operations and/or fences.
+ *
+ * Accepts `Acquire`, `Release`, `AcqRel` and `SeqCst` orderings.
+ */
+#[inline]
+pub fn fence(order: Ordering) {
+    unsafe {
+        match order {
+            Acquire => intrinsics::atomic_fence_acq(),
+            Release => intrinsics::atomic_fence_rel(),
+            AcqRel  => intrinsics::atomic_fence_rel(),
+            _       => intrinsics::atomic_fence(),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use option::*;
+    use super::*;
+
+    #[test]
+    fn flag() {
+        let mut flg = AtomicFlag::new();
+        assert!(!flg.test_and_set(SeqCst));
+        assert!(flg.test_and_set(SeqCst));
+
+        flg.clear(SeqCst);
+        assert!(!flg.test_and_set(SeqCst));
+    }
+
+    #[test]
+    fn option_empty() {
+        let mut option: AtomicOption<()> = AtomicOption::empty();
+        assert!(option.is_empty(SeqCst));
+    }
+
+    #[test]
+    fn option_swap() {
+        let mut p = AtomicOption::new(~1);
+        let a = ~2;
+
+        let b = p.swap(a, SeqCst);
+
+        assert_eq!(b, Some(~1));
+        assert_eq!(p.take(SeqCst), Some(~2));
+    }
+
+    #[test]
+    fn option_take() {
+        let mut p = AtomicOption::new(~1);
+
+        assert_eq!(p.take(SeqCst), Some(~1));
+        assert_eq!(p.take(SeqCst), None);
+
+        let p2 = ~2;
+        p.swap(p2, SeqCst);
+
+        assert_eq!(p.take(SeqCst), Some(~2));
+    }
+
+    #[test]
+    fn option_fill() {
+        let mut p = AtomicOption::new(~1);
+        assert!(p.fill(~2, SeqCst).is_some()); // should fail; shouldn't leak!
+        assert_eq!(p.take(SeqCst), Some(~1));
+
+        assert!(p.fill(~2, SeqCst).is_none()); // shouldn't fail
+        assert_eq!(p.take(SeqCst), Some(~2));
+    }
+
+    #[test]
+    fn bool_and() {
+        let mut a = AtomicBool::new(true);
+        assert_eq!(a.fetch_and(false, SeqCst),true);
+        assert_eq!(a.load(SeqCst),false);
+    }
+
+    static mut S_FLAG : AtomicFlag = INIT_ATOMIC_FLAG;
+    static mut S_BOOL : AtomicBool = INIT_ATOMIC_BOOL;
+    static mut S_INT  : AtomicInt  = INIT_ATOMIC_INT;
+    static mut S_UINT : AtomicUint = INIT_ATOMIC_UINT;
+
+    #[test]
+    fn static_init() {
+        unsafe {
+            assert!(!S_FLAG.test_and_set(SeqCst));
+            assert!(!S_BOOL.load(SeqCst));
+            assert!(S_INT.load(SeqCst) == 0);
+            assert!(S_UINT.load(SeqCst) == 0);
+        }
+    }
+}
diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs
new file mode 100644
index 00000000000..4d0efcd6ee1
--- /dev/null
+++ b/src/libstd/sync/deque.rs
@@ -0,0 +1,661 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A (mostly) lock-free concurrent work-stealing deque
+//!
+//! This module contains an implementation of the Chase-Lev work stealing deque
+//! described in "Dynamic Circular Work-Stealing Deque". The implementation is
+//! heavily based on the pseudocode found in the paper.
+//!
+//! This implementation does not want to have the restriction of a garbage
+//! collector for reclamation of buffers, and instead it uses a shared pool of
+//! buffers. This shared pool is required for correctness in this
+//! implementation.
+//!
+//! The only lock-synchronized portions of this deque are the buffer allocation
+//! and deallocation portions. Otherwise all operations are lock-free.
+//!
+//! # Example
+//!
+//!     use std::rt::deque::BufferPool;
+//!
+//!     let mut pool = BufferPool::new();
+//!     let (mut worker, mut stealer) = pool.deque();
+//!
+//!     // Only the worker may push/pop
+//!     worker.push(1);
+//!     worker.pop();
+//!
+//!     // Stealers take data from the other end of the deque
+//!     worker.push(1);
+//!     stealer.steal();
+//!
+//!     // Stealers can be cloned to have many stealers stealing in parallel
+//!     worker.push(1);
+//!     let mut stealer2 = stealer.clone();
+//!     stealer2.steal();
+
+// NB: the "buffer pool" strategy is not done for speed, but rather for
+//     correctness. For more info, see the comment on `swap_buffer`
+
+// XXX: all atomic operations in this module use a SeqCst ordering. That is
+//      probably overkill
+
+use cast;
+use clone::Clone;
+use iter::{range, Iterator};
+use kinds::Send;
+use libc;
+use mem;
+use ops::Drop;
+use option::{Option, Some, None};
+use ptr;
+use ptr::RawPtr;
+use sync::arc::UnsafeArc;
+use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
+use unstable::sync::Exclusive;
+use vec::{OwnedVector, ImmutableVector};
+
+// Once the queue is less than 1/K full, then it will be downsized. Note that
+// the deque requires that this number be less than 2.
+static K: int = 4;
+
+// Minimum number of bits that a buffer size should be. No buffer will resize to
+// under this value, and all deques will initially contain a buffer of this
+// size.
+//
+// The size in question is 1 << MIN_BITS
+static MIN_BITS: int = 7;
+
+struct Deque<T> {
+    bottom: AtomicInt,
+    top: AtomicInt,
+    array: AtomicPtr<Buffer<T>>,
+    pool: BufferPool<T>,
+}
+
+/// Worker half of the work-stealing deque. This worker has exclusive access to
+/// one side of the deque, and uses `push` and `pop` method to manipulate it.
+///
+/// There may only be one worker per deque.
+pub struct Worker<T> {
+    priv deque: UnsafeArc<Deque<T>>,
+}
+
+/// The stealing half of the work-stealing deque. Stealers have access to the
+/// opposite end of the deque from the worker, and they only have access to the
+/// `steal` method.
+pub struct Stealer<T> {
+    priv deque: UnsafeArc<Deque<T>>,
+}
+
+/// When stealing some data, this is an enumeration of the possible outcomes.
+#[deriving(Eq)]
+pub enum Stolen<T> {
+    /// The deque was empty at the time of stealing
+    Empty,
+    /// The stealer lost the race for stealing data, and a retry may return more
+    /// data.
+    Abort,
+    /// The stealer has successfully stolen some data.
+    Data(T),
+}
+
+/// The allocation pool for buffers used by work-stealing deques. Right now this
+/// structure is used for reclamation of memory after it is no longer in use by
+/// deques.
+///
+/// This data structure is protected by a mutex, but it is rarely used. Deques
+/// will only use this structure when allocating a new buffer or deallocating a
+/// previous one.
+pub struct BufferPool<T> {
+    priv pool: Exclusive<~[~Buffer<T>]>,
+}
+
+/// An internal buffer used by the chase-lev deque. This structure is actually
+/// implemented as a circular buffer, and is used as the intermediate storage of
+/// the data in the deque.
+///
+/// This type is implemented with *T instead of ~[T] for two reasons:
+///
+///   1. There is nothing safe about using this buffer. This easily allows the
+///      same value to be read twice in to rust, and there is nothing to
+///      prevent this. The usage by the deque must ensure that one of the
+///      values is forgotten. Furthermore, we only ever want to manually run
+///      destructors for values in this buffer (on drop) because the bounds
+///      are defined by the deque it's owned by.
+///
+///   2. We can certainly avoid bounds checks using *T instead of ~[T], although
+///      LLVM is probably pretty good at doing this already.
+struct Buffer<T> {
+    storage: *T,
+    log_size: int,
+}
+
+impl<T: Send> BufferPool<T> {
+    /// Allocates a new buffer pool which in turn can be used to allocate new
+    /// deques.
+    pub fn new() -> BufferPool<T> {
+        BufferPool { pool: Exclusive::new(~[]) }
+    }
+
+    /// Allocates a new work-stealing deque which will send/receiving memory to
+    /// and from this buffer pool.
+    pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
+        let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
+        (Worker { deque: a }, Stealer { deque: b })
+    }
+
+    fn alloc(&mut self, bits: int) -> ~Buffer<T> {
+        unsafe {
+            self.pool.with(|pool| {
+                match pool.iter().position(|x| x.size() >= (1 << bits)) {
+                    Some(i) => pool.remove(i),
+                    None => ~Buffer::new(bits)
+                }
+            })
+        }
+    }
+
+    fn free(&mut self, buf: ~Buffer<T>) {
+        unsafe {
+            let mut buf = Some(buf);
+            self.pool.with(|pool| {
+                let buf = buf.take_unwrap();
+                match pool.iter().position(|v| v.size() > buf.size()) {
+                    Some(i) => pool.insert(i, buf),
+                    None => pool.push(buf),
+                }
+            })
+        }
+    }
+}
+
+impl<T: Send> Clone for BufferPool<T> {
+    fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
+}
+
+impl<T: Send> Worker<T> {
+    /// Pushes data onto the front of this work queue.
+    pub fn push(&mut self, t: T) {
+        unsafe { (*self.deque.get()).push(t) }
+    }
+    /// Pops data off the front of the work queue, returning `None` on an empty
+    /// queue.
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe { (*self.deque.get()).pop() }
+    }
+
+    /// Gets access to the buffer pool that this worker is attached to. This can
+    /// be used to create more deques which share the same buffer pool as this
+    /// deque.
+    pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
+        unsafe { &mut (*self.deque.get()).pool }
+    }
+}
+
+impl<T: Send> Stealer<T> {
+    /// Steals work off the end of the queue (opposite of the worker's end)
+    pub fn steal(&mut self) -> Stolen<T> {
+        unsafe { (*self.deque.get()).steal() }
+    }
+
+    /// Gets access to the buffer pool that this stealer is attached to. This
+    /// can be used to create more deques which share the same buffer pool as
+    /// this deque.
+    pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
+        unsafe { &mut (*self.deque.get()).pool }
+    }
+}
+
+impl<T: Send> Clone for Stealer<T> {
+    fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
+}
+
+// Almost all of this code can be found directly in the paper so I'm not
+// personally going to heavily comment what's going on here.
+
+impl<T: Send> Deque<T> {
+    fn new(mut pool: BufferPool<T>) -> Deque<T> {
+        let buf = pool.alloc(MIN_BITS);
+        Deque {
+            bottom: AtomicInt::new(0),
+            top: AtomicInt::new(0),
+            array: AtomicPtr::new(unsafe { cast::transmute(buf) }),
+            pool: pool,
+        }
+    }
+
+    unsafe fn push(&mut self, data: T) {
+        let mut b = self.bottom.load(SeqCst);
+        let t = self.top.load(SeqCst);
+        let mut a = self.array.load(SeqCst);
+        let size = b - t;
+        if size >= (*a).size() - 1 {
+            // You won't find this code in the chase-lev deque paper. This is
+            // alluded to in a small footnote, however. We always free a buffer
+            // when growing in order to prevent leaks.
+            a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
+            b = self.bottom.load(SeqCst);
+        }
+        (*a).put(b, data);
+        self.bottom.store(b + 1, SeqCst);
+    }
+
+    unsafe fn pop(&mut self) -> Option<T> {
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        let b = b - 1;
+        self.bottom.store(b, SeqCst);
+        let t = self.top.load(SeqCst);
+        let size = b - t;
+        if size < 0 {
+            self.bottom.store(t, SeqCst);
+            return None;
+        }
+        let data = (*a).get(b);
+        if size > 0 {
+            self.maybe_shrink(b, t);
+            return Some(data);
+        }
+        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
+            self.bottom.store(t + 1, SeqCst);
+            return Some(data);
+        } else {
+            self.bottom.store(t + 1, SeqCst);
+            cast::forget(data); // someone else stole this value
+            return None;
+        }
+    }
+
+    unsafe fn steal(&mut self) -> Stolen<T> {
+        let t = self.top.load(SeqCst);
+        let old = self.array.load(SeqCst);
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        let size = b - t;
+        if size <= 0 { return Empty }
+        if size % (*a).size() == 0 {
+            if a == old && t == self.top.load(SeqCst) {
+                return Empty
+            }
+            return Abort
+        }
+        let data = (*a).get(t);
+        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
+            Data(data)
+        } else {
+            cast::forget(data); // someone else stole this value
+            Abort
+        }
+    }
+
+    unsafe fn maybe_shrink(&mut self, b: int, t: int) {
+        let a = self.array.load(SeqCst);
+        if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
+            self.swap_buffer(b, a, (*a).resize(b, t, -1));
+        }
+    }
+
+    // Helper routine not mentioned in the paper which is used in growing and
+    // shrinking buffers to swap in a new buffer into place. As a bit of a
+    // recap, the whole point that we need a buffer pool rather than just
+    // calling malloc/free directly is that stealers can continue using buffers
+    // after this method has called 'free' on it. The continued usage is simply
+    // a read followed by a forget, but we must make sure that the memory can
+    // continue to be read after we flag this buffer for reclamation.
+    unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
+                          buf: Buffer<T>) -> *mut Buffer<T> {
+        let newbuf: *mut Buffer<T> = cast::transmute(~buf);
+        self.array.store(newbuf, SeqCst);
+        let ss = (*newbuf).size();
+        self.bottom.store(b + ss, SeqCst);
+        let t = self.top.load(SeqCst);
+        if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
+            self.bottom.store(b, SeqCst);
+        }
+        self.pool.free(cast::transmute(old));
+        return newbuf;
+    }
+}
+
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Deque<T> {
+    fn drop(&mut self) {
+        let t = self.top.load(SeqCst);
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        // Free whatever is leftover in the dequeue, and then move the buffer
+        // back into the pool.
+        for i in range(t, b) {
+            let _: T = unsafe { (*a).get(i) };
+        }
+        self.pool.free(unsafe { cast::transmute(a) });
+    }
+}
+
+impl<T: Send> Buffer<T> {
+    unsafe fn new(log_size: int) -> Buffer<T> {
+        let size = (1 << log_size) * mem::size_of::<T>();
+        let buffer = libc::malloc(size as libc::size_t);
+        assert!(!buffer.is_null());
+        Buffer {
+            storage: buffer as *T,
+            log_size: log_size,
+        }
+    }
+
+    fn size(&self) -> int { 1 << self.log_size }
+
+    // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
+    fn mask(&self) -> int { (1 << self.log_size) - 1 }
+
+    // This does not protect against loading duplicate values of the same cell,
+    // nor does this clear out the contents contained within. Hence, this is a
+    // very unsafe method which the caller needs to treat specially in case a
+    // race is lost.
+    unsafe fn get(&self, i: int) -> T {
+        ptr::read_ptr(self.storage.offset(i & self.mask()))
+    }
+
+    // Unsafe because this unsafely overwrites possibly uninitialized or
+    // initialized data.
+    unsafe fn put(&mut self, i: int, t: T) {
+        let ptr = self.storage.offset(i & self.mask());
+        ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
+        cast::forget(t);
+    }
+
+    // Again, unsafe because this has incredibly dubious ownership violations.
+    // It is assumed that this buffer is immediately dropped.
+    unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
+        let mut buf = Buffer::new(self.log_size + delta);
+        for i in range(t, b) {
+            buf.put(i, self.get(i));
+        }
+        return buf;
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Buffer<T> {
+    fn drop(&mut self) {
+        // It is assumed that all buffers are empty on drop.
+        unsafe { libc::free(self.storage as *libc::c_void) }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+    use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
+
+    use cast;
+    use rt::thread::Thread;
+    use rand;
+    use rand::Rng;
+    use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
+                        AtomicUint, INIT_ATOMIC_UINT};
+    use vec;
+
+    #[test]
+    fn smoke() {
+        let mut pool = BufferPool::new();
+        let (mut w, mut s) = pool.deque();
+        assert_eq!(w.pop(), None);
+        assert_eq!(s.steal(), Empty);
+        w.push(1);
+        assert_eq!(w.pop(), Some(1));
+        w.push(1);
+        assert_eq!(s.steal(), Data(1));
+        w.push(1);
+        assert_eq!(s.clone().steal(), Data(1));
+    }
+
+    #[test]
+    fn stealpush() {
+        static AMT: int = 100000;
+        let mut pool = BufferPool::<int>::new();
+        let (mut w, s) = pool.deque();
+        let t = do Thread::start {
+            let mut s = s;
+            let mut left = AMT;
+            while left > 0 {
+                match s.steal() {
+                    Data(i) => {
+                        assert_eq!(i, 1);
+                        left -= 1;
+                    }
+                    Abort | Empty => {}
+                }
+            }
+        };
+
+        for _ in range(0, AMT) {
+            w.push(1);
+        }
+
+        t.join();
+    }
+
+    #[test]
+    fn stealpush_large() {
+        static AMT: int = 100000;
+        let mut pool = BufferPool::<(int, int)>::new();
+        let (mut w, s) = pool.deque();
+        let t = do Thread::start {
+            let mut s = s;
+            let mut left = AMT;
+            while left > 0 {
+                match s.steal() {
+                    Data((1, 10)) => { left -= 1; }
+                    Data(..) => fail!(),
+                    Abort | Empty => {}
+                }
+            }
+        };
+
+        for _ in range(0, AMT) {
+            w.push((1, 10));
+        }
+
+        t.join();
+    }
+
+    fn stampede(mut w: Worker<~int>, s: Stealer<~int>,
+                nthreads: int, amt: uint) {
+        for _ in range(0, amt) {
+            w.push(~20);
+        }
+        let mut remaining = AtomicUint::new(amt);
+        let unsafe_remaining: *mut AtomicUint = &mut remaining;
+
+        let threads = range(0, nthreads).map(|_| {
+            let s = s.clone();
+            do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    while (*unsafe_remaining).load(SeqCst) > 0 {
+                        match s.steal() {
+                            Data(~20) => {
+                                (*unsafe_remaining).fetch_sub(1, SeqCst);
+                            }
+                            Data(..) => fail!(),
+                            Abort | Empty => {}
+                        }
+                    }
+                }
+            }
+        }).to_owned_vec();
+
+        while remaining.load(SeqCst) > 0 {
+            match w.pop() {
+                Some(~20) => { remaining.fetch_sub(1, SeqCst); }
+                Some(..) => fail!(),
+                None => {}
+            }
+        }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+
+    #[test]
+    fn run_stampede() {
+        let mut pool = BufferPool::<~int>::new();
+        let (w, s) = pool.deque();
+        stampede(w, s, 8, 10000);
+    }
+
+    #[test]
+    fn many_stampede() {
+        static AMT: uint = 4;
+        let mut pool = BufferPool::<~int>::new();
+        let threads = range(0, AMT).map(|_| {
+            let (w, s) = pool.deque();
+            do Thread::start {
+                stampede(w, s, 4, 10000);
+            }
+        }).to_owned_vec();
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+
+    #[test]
+    fn stress() {
+        static AMT: int = 100000;
+        static NTHREADS: int = 8;
+        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
+        static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
+        let mut pool = BufferPool::<int>::new();
+        let (mut w, s) = pool.deque();
+
+        let threads = range(0, NTHREADS).map(|_| {
+            let s = s.clone();
+            do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    loop {
+                        match s.steal() {
+                            Data(2) => { HITS.fetch_add(1, SeqCst); }
+                            Data(..) => fail!(),
+                            _ if DONE.load(SeqCst) => break,
+                            _ => {}
+                        }
+                    }
+                }
+            }
+        }).to_owned_vec();
+
+        let mut rng = rand::task_rng();
+        let mut expected = 0;
+        while expected < AMT {
+            if rng.gen_range(0, 3) == 2 {
+                match w.pop() {
+                    None => {}
+                    Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
+                    Some(_) => fail!(),
+                }
+            } else {
+                expected += 1;
+                w.push(2);
+            }
+        }
+
+        unsafe {
+            while HITS.load(SeqCst) < AMT as uint {
+                match w.pop() {
+                    None => {}
+                    Some(2) => { HITS.fetch_add(1, SeqCst); },
+                    Some(_) => fail!(),
+                }
+            }
+            DONE.store(true, SeqCst);
+        }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+
+        assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
+    }
+
+    #[test]
+    #[ignore(cfg(windows))] // apparently windows scheduling is weird?
+    fn no_starvation() {
+        static AMT: int = 10000;
+        static NTHREADS: int = 4;
+        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
+        let mut pool = BufferPool::<(int, uint)>::new();
+        let (mut w, s) = pool.deque();
+
+        let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
+            let s = s.clone();
+            let unique_box = ~AtomicUint::new(0);
+            let thread_box = unsafe {
+                *cast::transmute::<&~AtomicUint,**mut AtomicUint>(&unique_box)
+            };
+            (do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    loop {
+                        match s.steal() {
+                            Data((1, 2)) => {
+                                (*thread_box).fetch_add(1, SeqCst);
+                            }
+                            Data(..) => fail!(),
+                            _ if DONE.load(SeqCst) => break,
+                            _ => {}
+                        }
+                    }
+                }
+            }, unique_box)
+        }));
+
+        let mut rng = rand::task_rng();
+        let mut myhit = false;
+        let mut iter = 0;
+        'outer: loop {
+            for _ in range(0, rng.gen_range(0, AMT)) {
+                if !myhit && rng.gen_range(0, 3) == 2 {
+                    match w.pop() {
+                        None => {}
+                        Some((1, 2)) => myhit = true,
+                        Some(_) => fail!(),
+                    }
+                } else {
+                    w.push((1, 2));
+                }
+            }
+            iter += 1;
+
+            debug!("loop iteration {}", iter);
+            for (i, slot) in hits.iter().enumerate() {
+                let amt = slot.load(SeqCst);
+                debug!("thread {}: {}", i, amt);
+                if amt == 0 { continue 'outer; }
+            }
+            if myhit {
+                break
+            }
+        }
+
+        unsafe { DONE.store(true, SeqCst); }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+}
+
diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs
new file mode 100644
index 00000000000..3213c538152
--- /dev/null
+++ b/src/libstd/sync/mod.rs
@@ -0,0 +1,23 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Useful synchronization primitives
+//!
+//! This modules contains useful safe and unsafe synchronization primitives.
+//! Most of the primitives in this module do not provide any sort of locking
+//! and/or blocking at all, but rather provide the necessary tools to build
+//! other types of concurrent primitives.
+
+pub mod arc;
+pub mod atomics;
+pub mod deque;
+pub mod mpmc_bounded_queue;
+pub mod mpsc_queue;
+pub mod spsc_queue;
diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs
new file mode 100644
index 00000000000..b623976306d
--- /dev/null
+++ b/src/libstd/sync/mpmc_bounded_queue.rs
@@ -0,0 +1,211 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *    1. Redistributions of source code must retain the above copyright notice,
+ *       this list of conditions and the following disclaimer.
+ *
+ *    2. Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+#[allow(missing_doc, dead_code)];
+
+// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
+
+use clone::Clone;
+use kinds::Send;
+use num::{Exponential,Algebraic,Round};
+use option::{Option, Some, None};
+use sync::arc::UnsafeArc;
+use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
+use vec;
+
+struct Node<T> {
+    sequence: AtomicUint,
+    value: Option<T>,
+}
+
+struct State<T> {
+    pad0: [u8, ..64],
+    buffer: ~[Node<T>],
+    mask: uint,
+    pad1: [u8, ..64],
+    enqueue_pos: AtomicUint,
+    pad2: [u8, ..64],
+    dequeue_pos: AtomicUint,
+    pad3: [u8, ..64],
+}
+
+pub struct Queue<T> {
+    priv state: UnsafeArc<State<T>>,
+}
+
+impl<T: Send> State<T> {
+    fn with_capacity(capacity: uint) -> State<T> {
+        let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
+            if capacity < 2 {
+                2u
+            } else {
+                // use next power of 2 as capacity
+                2f64.pow(&((capacity as f64).log2().ceil())) as uint
+            }
+        } else {
+            capacity
+        };
+        let buffer = vec::from_fn(capacity, |i:uint| {
+            Node{sequence:AtomicUint::new(i),value:None}
+        });
+        State{
+            pad0: [0, ..64],
+            buffer: buffer,
+            mask: capacity-1,
+            pad1: [0, ..64],
+            enqueue_pos: AtomicUint::new(0),
+            pad2: [0, ..64],
+            dequeue_pos: AtomicUint::new(0),
+            pad3: [0, ..64],
+        }
+    }
+
+    fn push(&mut self, value: T) -> bool {
+        let mask = self.mask;
+        let mut pos = self.enqueue_pos.load(Relaxed);
+        loop {
+            let node = &mut self.buffer[pos & mask];
+            let seq = node.sequence.load(Acquire);
+            let diff: int = seq as int - pos as int;
+
+            if diff == 0 {
+                let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
+                if enqueue_pos == pos {
+                    node.value = Some(value);
+                    node.sequence.store(pos+1, Release);
+                    break
+                } else {
+                    pos = enqueue_pos;
+                }
+            } else if (diff < 0) {
+                return false
+            } else {
+                pos = self.enqueue_pos.load(Relaxed);
+            }
+        }
+        true
+    }
+
+    fn pop(&mut self) -> Option<T> {
+        let mask = self.mask;
+        let mut pos = self.dequeue_pos.load(Relaxed);
+        loop {
+            let node = &mut self.buffer[pos & mask];
+            let seq = node.sequence.load(Acquire);
+            let diff: int = seq as int - (pos + 1) as int;
+            if diff == 0 {
+                let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
+                if dequeue_pos == pos {
+                    let value = node.value.take();
+                    node.sequence.store(pos + mask + 1, Release);
+                    return value
+                } else {
+                    pos = dequeue_pos;
+                }
+            } else if diff < 0 {
+                return None
+            } else {
+                pos = self.dequeue_pos.load(Relaxed);
+            }
+        }
+    }
+}
+
+impl<T: Send> Queue<T> {
+    pub fn with_capacity(capacity: uint) -> Queue<T> {
+        Queue{
+            state: UnsafeArc::new(State::with_capacity(capacity))
+        }
+    }
+
+    pub fn push(&mut self, value: T) -> bool {
+        unsafe { (*self.state.get()).push(value) }
+    }
+
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+}
+
+impl<T: Send> Clone for Queue<T> {
+    fn clone(&self) -> Queue<T> {
+        Queue {
+            state: self.state.clone()
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+    use option::*;
+    use task;
+    use super::Queue;
+
+    #[test]
+    fn test() {
+        let nthreads = 8u;
+        let nmsgs = 1000u;
+        let mut q = Queue::with_capacity(nthreads*nmsgs);
+        assert_eq!(None, q.pop());
+
+        for _ in range(0, nthreads) {
+            let q = q.clone();
+            do task::spawn_sched(task::SingleThreaded) {
+                let mut q = q;
+                for i in range(0, nmsgs) {
+                    assert!(q.push(i));
+                }
+            }
+        }
+
+        let mut completion_ports = ~[];
+        for _ in range(0, nthreads) {
+            let (completion_port, completion_chan) = Chan::new();
+            completion_ports.push(completion_port);
+            let q = q.clone();
+            do task::spawn_sched(task::SingleThreaded) {
+                let mut q = q;
+                let mut i = 0u;
+                loop {
+                    match q.pop() {
+                        None => {},
+                        Some(_) => {
+                            i += 1;
+                            if i == nmsgs { break }
+                        }
+                    }
+                }
+                completion_chan.send(i);
+            }
+        }
+
+        for completion_port in completion_ports.mut_iter() {
+            assert_eq!(nmsgs, completion_port.recv());
+        }
+    }
+}
diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs
new file mode 100644
index 00000000000..89e56e3fa67
--- /dev/null
+++ b/src/libstd/sync/mpsc_queue.rs
@@ -0,0 +1,245 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *    1. Redistributions of source code must retain the above copyright notice,
+ *       this list of conditions and the following disclaimer.
+ *
+ *    2. Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue.
+//!
+//! This module contains an implementation of a concurrent MPSC queue. This
+//! queue can be used to share data between tasks, and is also used as the
+//! building block of channels in rust.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+//                         /queues/non-intrusive-mpsc-node-based-queue
+
+use cast;
+use clone::Clone;
+use kinds::Send;
+use ops::Drop;
+use option::{Option, None, Some};
+use ptr::RawPtr;
+use sync::arc::UnsafeArc;
+use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
+
+/// A result of the `pop` function.
+pub enum PopResult<T> {
+    /// Some data has been popped
+    Data(T),
+    /// The queue is empty
+    Empty,
+    /// The queue is in an inconsistent state. Popping data should succeed, but
+    /// some pushers have yet to make enough progress in order allow a pop to
+    /// succeed. It is recommended that a pop() occur "in the near future" in
+    /// order to see if the sender has made progress or not
+    Inconsistent,
+}
+
+struct Node<T> {
+    next: AtomicPtr<Node<T>>,
+    value: Option<T>,
+}
+
+struct State<T, P> {
+    head: AtomicPtr<Node<T>>,
+    tail: *mut Node<T>,
+    packet: P,
+}
+
+/// The consumer half of this concurrent queue. This half is used to receive
+/// data from the producers.
+pub struct Consumer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+/// The production half of the concurrent queue. This handle may be cloned in
+/// order to make handles for new producers.
+pub struct Producer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+impl<T: Send, P: Send> Clone for Producer<T, P> {
+    fn clone(&self) -> Producer<T, P> {
+        Producer { state: self.state.clone() }
+    }
+}
+
+/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet"
+/// of information which will be shared by the consumer and the producer which
+/// can be re-acquired via the `packet` function. This is helpful when extra
+/// state is shared between the producer and consumer, but note that there is no
+/// synchronization performed of this data.
+pub fn queue<T: Send, P: Send>(p: P) -> (Consumer<T, P>, Producer<T, P>) {
+    unsafe {
+        let (a, b) = UnsafeArc::new2(State::new(p));
+        (Consumer { state: a }, Producer { state: b })
+    }
+}
+
+impl<T> Node<T> {
+    unsafe fn new(v: Option<T>) -> *mut Node<T> {
+        cast::transmute(~Node {
+            next: AtomicPtr::new(0 as *mut Node<T>),
+            value: v,
+        })
+    }
+}
+
+impl<T: Send, P: Send> State<T, P> {
+    unsafe fn new(p: P) -> State<T, P> {
+        let stub = Node::new(None);
+        State {
+            head: AtomicPtr::new(stub),
+            tail: stub,
+            packet: p,
+        }
+    }
+
+    unsafe fn push(&mut self, t: T) {
+        let n = Node::new(Some(t));
+        let prev = self.head.swap(n, AcqRel);
+        (*prev).next.store(n, Release);
+    }
+
+    unsafe fn pop(&mut self) -> PopResult<T> {
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+
+        if !next.is_null() {
+            self.tail = next;
+            assert!((*tail).value.is_none());
+            assert!((*next).value.is_some());
+            let ret = (*next).value.take_unwrap();
+            let _: ~Node<T> = cast::transmute(tail);
+            return Data(ret);
+        }
+
+        if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send, P: Send> Drop for State<T, P> {
+    fn drop(&mut self) {
+        unsafe {
+            let mut cur = self.tail;
+            while !cur.is_null() {
+                let next = (*cur).next.load(Relaxed);
+                let _: ~Node<T> = cast::transmute(cur);
+                cur = next;
+            }
+        }
+    }
+}
+
+impl<T: Send, P: Send> Producer<T, P> {
+    /// Pushes a new value onto this queue.
+    pub fn push(&mut self, value: T) {
+        unsafe { (*self.state.get()).push(value) }
+    }
+    /// Gets an unsafe pointer to the user-defined packet shared by the
+    /// producers and the consumer. Note that care must be taken to ensure that
+    /// the lifetime of the queue outlives the usage of the returned pointer.
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+impl<T: Send, P: Send> Consumer<T, P> {
+    /// Pops some data from this queue.
+    ///
+    /// Note that the current implementation means that this function cannot
+    /// return `Option<T>`. It is possible for this queue to be in an
+    /// inconsistent state where many pushes have suceeded and completely
+    /// finished, but pops cannot return `Some(t)`. This inconsistent state
+    /// happens when a pusher is pre-empted at an inopportune moment.
+    ///
+    /// This inconsistent state means that this queue does indeed have data, but
+    /// it does not currently have access to it at this time.
+    pub fn pop(&mut self) -> PopResult<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+    /// Attempts to pop data from this queue, but doesn't attempt too hard. This
+    /// will canonicalize inconsistent states to a `None` value.
+    pub fn casual_pop(&mut self) -> Option<T> {
+        match self.pop() {
+            Data(t) => Some(t),
+            Empty | Inconsistent => None,
+        }
+    }
+    /// Gets an unsafe pointer to the underlying user-defined packet. See
+    /// `Producer.packet` for more information.
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+
+    use task;
+    use super::{queue, Data, Empty, Inconsistent};
+
+    #[test]
+    fn test_full() {
+        let (_, mut p) = queue(());
+        p.push(~1);
+        p.push(~2);
+    }
+
+    #[test]
+    fn test() {
+        let nthreads = 8u;
+        let nmsgs = 1000u;
+        let (mut c, p) = queue(());
+        match c.pop() {
+            Empty => {}
+            Inconsistent | Data(..) => fail!()
+        }
+
+        for _ in range(0, nthreads) {
+            let q = p.clone();
+            do task::spawn_sched(task::SingleThreaded) {
+                let mut q = q;
+                for i in range(0, nmsgs) {
+                    q.push(i);
+                }
+            }
+        }
+
+        let mut i = 0u;
+        while i < nthreads * nmsgs {
+            match c.pop() {
+                Empty | Inconsistent => {},
+                Data(_) => { i += 1 }
+            }
+        }
+    }
+}
+
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
new file mode 100644
index 00000000000..c4abba04659
--- /dev/null
+++ b/src/libstd/sync/spsc_queue.rs
@@ -0,0 +1,334 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *    1. Redistributions of source code must retain the above copyright notice,
+ *       this list of conditions and the following disclaimer.
+ *
+ *    2. Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
+
+//! A single-producer single-consumer concurrent queue
+//!
+//! This module contains the implementation of an SPSC queue which can be used
+//! concurrently between two tasks. This data structure is safe to use and
+//! enforces the semantics that there is one pusher and one popper.
+
+use cast;
+use kinds::Send;
+use ops::Drop;
+use option::{Some, None, Option};
+use ptr::RawPtr;
+use sync::arc::UnsafeArc;
+use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
+
+// Node within the linked list queue of messages to send
+struct Node<T> {
+    // XXX: this could be an uninitialized T if we're careful enough, and
+    //      that would reduce memory usage (and be a bit faster).
+    //      is it worth it?
+    value: Option<T>,           // nullable for re-use of nodes
+    next: AtomicPtr<Node<T>>,   // next node in the queue
+}
+
+// The producer/consumer halves both need access to the `tail` field, and if
+// they both have access to that we may as well just give them both access
+// to this whole structure.
+struct State<T, P> {
+    // consumer fields
+    tail: *mut Node<T>, // where to pop from
+    tail_prev: AtomicPtr<Node<T>>, // where to pop from
+
+    // producer fields
+    head: *mut Node<T>,      // where to push to
+    first: *mut Node<T>,     // where to get new nodes from
+    tail_copy: *mut Node<T>, // between first/tail
+
+    // Cache maintenance fields. Additions and subtractions are stored
+    // separately in order to allow them to use nonatomic addition/subtraction.
+    cache_bound: uint,
+    cache_additions: AtomicUint,
+    cache_subtractions: AtomicUint,
+
+    packet: P,
+}
+
+/// Producer half of this queue. This handle is used to push data to the
+/// consumer.
+pub struct Producer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+/// Consumer half of this queue. This handle is used to receive data from the
+/// producer.
+pub struct Consumer<T, P> {
+    priv state: UnsafeArc<State<T, P>>,
+}
+
+/// Creates a new queue. The producer returned is connected to the consumer to
+/// push all data to the consumer.
+///
+/// # Arguments
+///
+///   * `bound` - This queue implementation is implemented with a linked list,
+///               and this means that a push is always a malloc. In order to
+///               amortize this cost, an internal cache of nodes is maintained
+///               to prevent a malloc from always being necessary. This bound is
+///               the limit on the size of the cache (if desired). If the value
+///               is 0, then the cache has no bound. Otherwise, the cache will
+///               never grow larger than `bound` (although the queue itself
+///               could be much larger.
+///
+///   * `p` - This is the user-defined packet of data which will also be shared
+///           between the producer and consumer.
+pub fn queue<T: Send, P: Send>(bound: uint,
+                               p: P) -> (Consumer<T, P>, Producer<T, P>)
+{
+    let n1 = Node::new();
+    let n2 = Node::new();
+    unsafe { (*n1).next.store(n2, Relaxed) }
+    let state = State {
+        tail: n2,
+        tail_prev: AtomicPtr::new(n1),
+        head: n2,
+        first: n1,
+        tail_copy: n1,
+        cache_bound: bound,
+        cache_additions: AtomicUint::new(0),
+        cache_subtractions: AtomicUint::new(0),
+        packet: p,
+    };
+    let (arc1, arc2) = UnsafeArc::new2(state);
+    (Consumer { state: arc1 }, Producer { state: arc2 })
+}
+
+impl<T: Send> Node<T> {
+    fn new() -> *mut Node<T> {
+        unsafe {
+            cast::transmute(~Node {
+                value: None,
+                next: AtomicPtr::new(0 as *mut Node<T>),
+            })
+        }
+    }
+}
+
+impl<T: Send, P: Send> Producer<T, P> {
+    /// Pushes data onto the queue
+    pub fn push(&mut self, t: T) {
+        unsafe { (*self.state.get()).push(t) }
+    }
+    /// Tests whether the queue is empty. Note that if this function returns
+    /// `false`, the return value is significant, but if the return value is
+    /// `true` then almost no meaning can be attached to the return value.
+    pub fn is_empty(&self) -> bool {
+        unsafe { (*self.state.get()).is_empty() }
+    }
+    /// Acquires an unsafe pointer to the underlying user-defined packet. Note
+    /// that care must be taken to ensure that the queue outlives the usage of
+    /// the packet (because it is an unsafe pointer).
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+impl<T: Send, P: Send> Consumer<T, P> {
+    /// Pops some data from this queue, returning `None` when the queue is
+    /// empty.
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe { (*self.state.get()).pop() }
+    }
+    /// Same function as the producer's `packet` method.
+    pub unsafe fn packet(&self) -> *mut P {
+        &mut (*self.state.get()).packet as *mut P
+    }
+}
+
+impl<T: Send, P: Send> State<T, P> {
+    // remember that there is only one thread executing `push` (and only one
+    // thread executing `pop`)
+    unsafe fn push(&mut self, t: T) {
+        // Acquire a node (which either uses a cached one or allocates a new
+        // one), and then append this to the 'head' node.
+        let n = self.alloc();
+        assert!((*n).value.is_none());
+        (*n).value = Some(t);
+        (*n).next.store(0 as *mut Node<T>, Relaxed);
+        (*self.head).next.store(n, Release);
+        self.head = n;
+    }
+
+    unsafe fn alloc(&mut self) -> *mut Node<T> {
+        // First try to see if we can consume the 'first' node for our uses.
+        // We try to avoid as many atomic instructions as possible here, so
+        // the addition to cache_subtractions is not atomic (plus we're the
+        // only one subtracting from the cache).
+        if self.first != self.tail_copy {
+            if self.cache_bound > 0 {
+                let b = self.cache_subtractions.load(Relaxed);
+                self.cache_subtractions.store(b + 1, Relaxed);
+            }
+            let ret = self.first;
+            self.first = (*ret).next.load(Relaxed);
+            return ret;
+        }
+        // If the above fails, then update our copy of the tail and try
+        // again.
+        self.tail_copy = self.tail_prev.load(Acquire);
+        if self.first != self.tail_copy {
+            if self.cache_bound > 0 {
+                let b = self.cache_subtractions.load(Relaxed);
+                self.cache_subtractions.store(b + 1, Relaxed);
+            }
+            let ret = self.first;
+            self.first = (*ret).next.load(Relaxed);
+            return ret;
+        }
+        // If all of that fails, then we have to allocate a new node
+        // (there's nothing in the node cache).
+        Node::new()
+    }
+
+    // remember that there is only one thread executing `pop` (and only one
+    // thread executing `push`)
+    unsafe fn pop(&mut self) -> Option<T> {
+        // The `tail` node is not actually a used node, but rather a
+        // sentinel from where we should start popping from. Hence, look at
+        // tail's next field and see if we can use it. If we do a pop, then
+        // the current tail node is a candidate for going into the cache.
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+        if next.is_null() { return None }
+        assert!((*next).value.is_some());
+        let ret = (*next).value.take();
+
+        self.tail = next;
+        if self.cache_bound == 0 {
+            self.tail_prev.store(tail, Release);
+        } else {
+            // XXX: this is dubious with overflow.
+            let additions = self.cache_additions.load(Relaxed);
+            let subtractions = self.cache_subtractions.load(Relaxed);
+            let size = additions - subtractions;
+
+            if size < self.cache_bound {
+                self.tail_prev.store(tail, Release);
+                self.cache_additions.store(additions + 1, Relaxed);
+            } else {
+                (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
+                // We have successfully erased all references to 'tail', so
+                // now we can safely drop it.
+                let _: ~Node<T> = cast::transmute(tail);
+            }
+        }
+        return ret;
+    }
+
+    unsafe fn is_empty(&self) -> bool {
+        let tail = self.tail;
+        let next = (*tail).next.load(Acquire);
+        return next.is_null();
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send, P: Send> Drop for State<T, P> {
+    fn drop(&mut self) {
+        unsafe {
+            let mut cur = self.first;
+            while !cur.is_null() {
+                let next = (*cur).next.load(Relaxed);
+                let _n: ~Node<T> = cast::transmute(cur);
+                cur = next;
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+    use super::queue;
+    use task;
+
+    #[test]
+    fn smoke() {
+        let (mut c, mut p) = queue(0, ());
+        p.push(1);
+        p.push(2);
+        assert_eq!(c.pop(), Some(1));
+        assert_eq!(c.pop(), Some(2));
+        assert_eq!(c.pop(), None);
+        p.push(3);
+        p.push(4);
+        assert_eq!(c.pop(), Some(3));
+        assert_eq!(c.pop(), Some(4));
+        assert_eq!(c.pop(), None);
+    }
+
+    #[test]
+    fn drop_full() {
+        let (_, mut p) = queue(0, ());
+        p.push(~1);
+        p.push(~2);
+    }
+
+    #[test]
+    fn smoke_bound() {
+        let (mut c, mut p) = queue(1, ());
+        p.push(1);
+        p.push(2);
+        assert_eq!(c.pop(), Some(1));
+        assert_eq!(c.pop(), Some(2));
+        assert_eq!(c.pop(), None);
+        p.push(3);
+        p.push(4);
+        assert_eq!(c.pop(), Some(3));
+        assert_eq!(c.pop(), Some(4));
+        assert_eq!(c.pop(), None);
+    }
+
+    #[test]
+    fn stress() {
+        stress_bound(0);
+        stress_bound(1);
+
+        fn stress_bound(bound: uint) {
+            let (c, mut p) = queue(bound, ());
+            do task::spawn_sched(task::SingleThreaded) {
+                let mut c = c;
+                for _ in range(0, 100000) {
+                    loop {
+                        match c.pop() {
+                            Some(1) => break,
+                            Some(_) => fail!(),
+                            None => {}
+                        }
+                    }
+                }
+            }
+            for _ in range(0, 100000) {
+                p.push(1);
+            }
+        }
+    }
+}