about summary refs log tree commit diff
path: root/src/libstd/sync
diff options
context:
space:
mode:
Diffstat (limited to 'src/libstd/sync')
-rw-r--r--src/libstd/sync/atomics.rs234
-rw-r--r--src/libstd/sync/deque.rs666
-rw-r--r--src/libstd/sync/future.rs209
-rw-r--r--src/libstd/sync/mod.rs16
-rw-r--r--src/libstd/sync/mpmc_bounded_queue.rs220
-rw-r--r--src/libstd/sync/mpsc_queue.rs208
-rw-r--r--src/libstd/sync/spsc_queue.rs300
-rw-r--r--src/libstd/sync/task_pool.rs98
8 files changed, 318 insertions, 1633 deletions
diff --git a/src/libstd/sync/atomics.rs b/src/libstd/sync/atomics.rs
deleted file mode 100644
index b2565a6a449..00000000000
--- a/src/libstd/sync/atomics.rs
+++ /dev/null
@@ -1,234 +0,0 @@
-// Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Atomic types
-//!
-//! Atomic types provide primitive shared-memory communication between
-//! threads, and are the building blocks of other concurrent
-//! types.
-//!
-//! This module defines atomic versions of a select number of primitive
-//! types, including `AtomicBool`, `AtomicInt`, `AtomicUint`, and `AtomicOption`.
-//! Atomic types present operations that, when used correctly, synchronize
-//! updates between threads.
-//!
-//! Each method takes an `Ordering` which represents the strength of
-//! the memory barrier for that operation. These orderings are the
-//! same as [C++11 atomic orderings][1].
-//!
-//! [1]: http://gcc.gnu.org/wiki/Atomic/GCCMM/AtomicSync
-//!
-//! Atomic variables are safe to share between threads (they implement `Share`)
-//! but they do not themselves provide the mechanism for sharing. The most
-//! common way to share an atomic variable is to put it into an `Arc` (an
-//! atomically-reference-counted shared pointer).
-//!
-//! Most atomic types may be stored in static variables, initialized using
-//! the provided static initializers like `INIT_ATOMIC_BOOL`. Atomic statics
-//! are often used for lazy global initialization.
-//!
-//!
-//! # Examples
-//!
-//! A simple spinlock:
-//!
-//! ```
-//! extern crate sync;
-//!
-//! use sync::Arc;
-//! use std::sync::atomics::{AtomicUint, SeqCst};
-//! use std::task::deschedule;
-//!
-//! fn main() {
-//!     let spinlock = Arc::new(AtomicUint::new(1));
-//!
-//!     let spinlock_clone = spinlock.clone();
-//!     spawn(proc() {
-//!         spinlock_clone.store(0, SeqCst);
-//!     });
-//!
-//!     // Wait for the other task to release the lock
-//!     while spinlock.load(SeqCst) != 0 {
-//!         // Since tasks may not be preemptive (if they are green threads)
-//!         // yield to the scheduler to let the other task run. Low level
-//!         // concurrent code needs to take into account Rust's two threading
-//!         // models.
-//!         deschedule();
-//!     }
-//! }
-//! ```
-//!
-//! Transferring a heap object with `AtomicOption`:
-//!
-//! ```
-//! extern crate sync;
-//!
-//! use sync::Arc;
-//! use std::sync::atomics::{AtomicOption, SeqCst};
-//!
-//! fn main() {
-//!     struct BigObject;
-//!
-//!     let shared_big_object = Arc::new(AtomicOption::empty());
-//!
-//!     let shared_big_object_clone = shared_big_object.clone();
-//!     spawn(proc() {
-//!         let unwrapped_big_object = shared_big_object_clone.take(SeqCst);
-//!         if unwrapped_big_object.is_some() {
-//!             println!("got a big object from another task");
-//!         } else {
-//!             println!("other task hasn't sent big object yet");
-//!         }
-//!     });
-//!
-//!     shared_big_object.swap(box BigObject, SeqCst);
-//! }
-//! ```
-//!
-//! Keep a global count of live tasks:
-//!
-//! ```
-//! use std::sync::atomics::{AtomicUint, SeqCst, INIT_ATOMIC_UINT};
-//!
-//! static mut GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT;
-//!
-//! unsafe {
-//!     let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst);
-//!     println!("live tasks: {}", old_task_count + 1);
-//! }
-//! ```
-
-use mem;
-use ops::Drop;
-use option::{Option,Some,None};
-use owned::Box;
-
-pub use core::atomics::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr};
-pub use core::atomics::{Ordering, Relaxed, Release, Acquire, AcqRel, SeqCst};
-pub use core::atomics::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT};
-pub use core::atomics::fence;
-
-/// An atomic, nullable unique pointer
-///
-/// This can be used as the concurrency primitive for operations that transfer
-/// owned heap objects across tasks.
-#[unsafe_no_drop_flag]
-pub struct AtomicOption<T> {
-    p: AtomicUint,
-}
-
-impl<T> AtomicOption<T> {
-    /// Create a new `AtomicOption`
-    pub fn new(p: Box<T>) -> AtomicOption<T> {
-        unsafe { AtomicOption { p: AtomicUint::new(mem::transmute(p)) } }
-    }
-
-    /// Create a new `AtomicOption` that doesn't contain a value
-    pub fn empty() -> AtomicOption<T> { AtomicOption { p: AtomicUint::new(0) } }
-
-    /// Store a value, returning the old value
-    #[inline]
-    pub fn swap(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> {
-        let val = unsafe { mem::transmute(val) };
-
-        match self.p.swap(val, order) {
-            0 => None,
-            n => Some(unsafe { mem::transmute(n) }),
-        }
-    }
-
-    /// Remove the value, leaving the `AtomicOption` empty.
-    #[inline]
-    pub fn take(&self, order: Ordering) -> Option<Box<T>> {
-        unsafe { self.swap(mem::transmute(0), order) }
-    }
-
-    /// Replace an empty value with a non-empty value.
-    ///
-    /// Succeeds if the option is `None` and returns `None` if so. If
-    /// the option was already `Some`, returns `Some` of the rejected
-    /// value.
-    #[inline]
-    pub fn fill(&self, val: Box<T>, order: Ordering) -> Option<Box<T>> {
-        unsafe {
-            let val = mem::transmute(val);
-            let expected = mem::transmute(0);
-            let oldval = self.p.compare_and_swap(expected, val, order);
-            if oldval == expected {
-                None
-            } else {
-                Some(mem::transmute(val))
-            }
-        }
-    }
-
-    /// Returns `true` if the `AtomicOption` is empty.
-    ///
-    /// Be careful: The caller must have some external method of ensuring the
-    /// result does not get invalidated by another task after this returns.
-    #[inline]
-    pub fn is_empty(&self, order: Ordering) -> bool {
-        self.p.load(order) as uint == 0
-    }
-}
-
-#[unsafe_destructor]
-impl<T> Drop for AtomicOption<T> {
-    fn drop(&mut self) {
-        let _ = self.take(SeqCst);
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use option::*;
-    use super::*;
-
-    #[test]
-    fn option_empty() {
-        let option: AtomicOption<()> = AtomicOption::empty();
-        assert!(option.is_empty(SeqCst));
-    }
-
-    #[test]
-    fn option_swap() {
-        let p = AtomicOption::new(box 1);
-        let a = box 2;
-
-        let b = p.swap(a, SeqCst);
-
-        assert!(b == Some(box 1));
-        assert!(p.take(SeqCst) == Some(box 2));
-    }
-
-    #[test]
-    fn option_take() {
-        let p = AtomicOption::new(box 1);
-
-        assert!(p.take(SeqCst) == Some(box 1));
-        assert!(p.take(SeqCst) == None);
-
-        let p2 = box 2;
-        p.swap(p2, SeqCst);
-
-        assert!(p.take(SeqCst) == Some(box 2));
-    }
-
-    #[test]
-    fn option_fill() {
-        let p = AtomicOption::new(box 1);
-        assert!(p.fill(box 2, SeqCst).is_some()); // should fail; shouldn't leak!
-        assert!(p.take(SeqCst) == Some(box 1));
-
-        assert!(p.fill(box 2, SeqCst).is_none()); // shouldn't fail
-        assert!(p.take(SeqCst) == Some(box 2));
-    }
-}
-
diff --git a/src/libstd/sync/deque.rs b/src/libstd/sync/deque.rs
deleted file mode 100644
index 39e420685ab..00000000000
--- a/src/libstd/sync/deque.rs
+++ /dev/null
@@ -1,666 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! A (mostly) lock-free concurrent work-stealing deque
-//!
-//! This module contains an implementation of the Chase-Lev work stealing deque
-//! described in "Dynamic Circular Work-Stealing Deque". The implementation is
-//! heavily based on the pseudocode found in the paper.
-//!
-//! This implementation does not want to have the restriction of a garbage
-//! collector for reclamation of buffers, and instead it uses a shared pool of
-//! buffers. This shared pool is required for correctness in this
-//! implementation.
-//!
-//! The only lock-synchronized portions of this deque are the buffer allocation
-//! and deallocation portions. Otherwise all operations are lock-free.
-//!
-//! # Example
-//!
-//!     use std::rt::deque::BufferPool;
-//!
-//!     let mut pool = BufferPool::new();
-//!     let (mut worker, mut stealer) = pool.deque();
-//!
-//!     // Only the worker may push/pop
-//!     worker.push(1);
-//!     worker.pop();
-//!
-//!     // Stealers take data from the other end of the deque
-//!     worker.push(1);
-//!     stealer.steal();
-//!
-//!     // Stealers can be cloned to have many stealers stealing in parallel
-//!     worker.push(1);
-//!     let mut stealer2 = stealer.clone();
-//!     stealer2.steal();
-
-// NB: the "buffer pool" strategy is not done for speed, but rather for
-//     correctness. For more info, see the comment on `swap_buffer`
-
-// FIXME: all atomic operations in this module use a SeqCst ordering. That is
-//      probably overkill
-
-use alloc::arc::Arc;
-
-use clone::Clone;
-use iter::{range, Iterator};
-use kinds::Send;
-use kinds::marker;
-use mem::{forget, min_align_of, size_of, transmute, overwrite};
-use ops::Drop;
-use option::{Option, Some, None};
-use owned::Box;
-use ptr::RawPtr;
-use ptr;
-use rt::heap::{allocate, deallocate};
-use slice::ImmutableVector;
-use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
-use rt::exclusive::Exclusive;
-use vec::Vec;
-
-// Once the queue is less than 1/K full, then it will be downsized. Note that
-// the deque requires that this number be less than 2.
-static K: int = 4;
-
-// Minimum number of bits that a buffer size should be. No buffer will resize to
-// under this value, and all deques will initially contain a buffer of this
-// size.
-//
-// The size in question is 1 << MIN_BITS
-static MIN_BITS: int = 7;
-
-struct Deque<T> {
-    bottom: AtomicInt,
-    top: AtomicInt,
-    array: AtomicPtr<Buffer<T>>,
-    pool: BufferPool<T>,
-}
-
-/// Worker half of the work-stealing deque. This worker has exclusive access to
-/// one side of the deque, and uses `push` and `pop` method to manipulate it.
-///
-/// There may only be one worker per deque.
-pub struct Worker<T> {
-    deque: Arc<Deque<T>>,
-    noshare: marker::NoShare,
-}
-
-/// The stealing half of the work-stealing deque. Stealers have access to the
-/// opposite end of the deque from the worker, and they only have access to the
-/// `steal` method.
-pub struct Stealer<T> {
-    deque: Arc<Deque<T>>,
-    noshare: marker::NoShare,
-}
-
-/// When stealing some data, this is an enumeration of the possible outcomes.
-#[deriving(PartialEq, Show)]
-pub enum Stolen<T> {
-    /// The deque was empty at the time of stealing
-    Empty,
-    /// The stealer lost the race for stealing data, and a retry may return more
-    /// data.
-    Abort,
-    /// The stealer has successfully stolen some data.
-    Data(T),
-}
-
-/// The allocation pool for buffers used by work-stealing deques. Right now this
-/// structure is used for reclamation of memory after it is no longer in use by
-/// deques.
-///
-/// This data structure is protected by a mutex, but it is rarely used. Deques
-/// will only use this structure when allocating a new buffer or deallocating a
-/// previous one.
-pub struct BufferPool<T> {
-    pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>,
-}
-
-/// An internal buffer used by the chase-lev deque. This structure is actually
-/// implemented as a circular buffer, and is used as the intermediate storage of
-/// the data in the deque.
-///
-/// This type is implemented with *T instead of Vec<T> for two reasons:
-///
-///   1. There is nothing safe about using this buffer. This easily allows the
-///      same value to be read twice in to rust, and there is nothing to
-///      prevent this. The usage by the deque must ensure that one of the
-///      values is forgotten. Furthermore, we only ever want to manually run
-///      destructors for values in this buffer (on drop) because the bounds
-///      are defined by the deque it's owned by.
-///
-///   2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
-///      LLVM is probably pretty good at doing this already.
-struct Buffer<T> {
-    storage: *T,
-    log_size: int,
-}
-
-impl<T: Send> BufferPool<T> {
-    /// Allocates a new buffer pool which in turn can be used to allocate new
-    /// deques.
-    pub fn new() -> BufferPool<T> {
-        BufferPool { pool: Arc::new(Exclusive::new(vec!())) }
-    }
-
-    /// Allocates a new work-stealing deque which will send/receiving memory to
-    /// and from this buffer pool.
-    pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
-        let a = Arc::new(Deque::new(self.clone()));
-        let b = a.clone();
-        (Worker { deque: a, noshare: marker::NoShare },
-         Stealer { deque: b, noshare: marker::NoShare })
-    }
-
-    fn alloc(&self, bits: int) -> Box<Buffer<T>> {
-        unsafe {
-            let mut pool = self.pool.lock();
-            match pool.iter().position(|x| x.size() >= (1 << bits)) {
-                Some(i) => pool.remove(i).unwrap(),
-                None => box Buffer::new(bits)
-            }
-        }
-    }
-
-    fn free(&self, buf: Box<Buffer<T>>) {
-        unsafe {
-            let mut pool = self.pool.lock();
-            match pool.iter().position(|v| v.size() > buf.size()) {
-                Some(i) => pool.insert(i, buf),
-                None => pool.push(buf),
-            }
-        }
-    }
-}
-
-impl<T: Send> Clone for BufferPool<T> {
-    fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
-}
-
-impl<T: Send> Worker<T> {
-    /// Pushes data onto the front of this work queue.
-    pub fn push(&self, t: T) {
-        unsafe { self.deque.push(t) }
-    }
-    /// Pops data off the front of the work queue, returning `None` on an empty
-    /// queue.
-    pub fn pop(&self) -> Option<T> {
-        unsafe { self.deque.pop() }
-    }
-
-    /// Gets access to the buffer pool that this worker is attached to. This can
-    /// be used to create more deques which share the same buffer pool as this
-    /// deque.
-    pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
-        &self.deque.pool
-    }
-}
-
-impl<T: Send> Stealer<T> {
-    /// Steals work off the end of the queue (opposite of the worker's end)
-    pub fn steal(&self) -> Stolen<T> {
-        unsafe { self.deque.steal() }
-    }
-
-    /// Gets access to the buffer pool that this stealer is attached to. This
-    /// can be used to create more deques which share the same buffer pool as
-    /// this deque.
-    pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
-        &self.deque.pool
-    }
-}
-
-impl<T: Send> Clone for Stealer<T> {
-    fn clone(&self) -> Stealer<T> {
-        Stealer { deque: self.deque.clone(), noshare: marker::NoShare }
-    }
-}
-
-// Almost all of this code can be found directly in the paper so I'm not
-// personally going to heavily comment what's going on here.
-
-impl<T: Send> Deque<T> {
-    fn new(pool: BufferPool<T>) -> Deque<T> {
-        let buf = pool.alloc(MIN_BITS);
-        Deque {
-            bottom: AtomicInt::new(0),
-            top: AtomicInt::new(0),
-            array: AtomicPtr::new(unsafe { transmute(buf) }),
-            pool: pool,
-        }
-    }
-
-    unsafe fn push(&self, data: T) {
-        let mut b = self.bottom.load(SeqCst);
-        let t = self.top.load(SeqCst);
-        let mut a = self.array.load(SeqCst);
-        let size = b - t;
-        if size >= (*a).size() - 1 {
-            // You won't find this code in the chase-lev deque paper. This is
-            // alluded to in a small footnote, however. We always free a buffer
-            // when growing in order to prevent leaks.
-            a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
-            b = self.bottom.load(SeqCst);
-        }
-        (*a).put(b, data);
-        self.bottom.store(b + 1, SeqCst);
-    }
-
-    unsafe fn pop(&self) -> Option<T> {
-        let b = self.bottom.load(SeqCst);
-        let a = self.array.load(SeqCst);
-        let b = b - 1;
-        self.bottom.store(b, SeqCst);
-        let t = self.top.load(SeqCst);
-        let size = b - t;
-        if size < 0 {
-            self.bottom.store(t, SeqCst);
-            return None;
-        }
-        let data = (*a).get(b);
-        if size > 0 {
-            self.maybe_shrink(b, t);
-            return Some(data);
-        }
-        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
-            self.bottom.store(t + 1, SeqCst);
-            return Some(data);
-        } else {
-            self.bottom.store(t + 1, SeqCst);
-            forget(data); // someone else stole this value
-            return None;
-        }
-    }
-
-    unsafe fn steal(&self) -> Stolen<T> {
-        let t = self.top.load(SeqCst);
-        let old = self.array.load(SeqCst);
-        let b = self.bottom.load(SeqCst);
-        let a = self.array.load(SeqCst);
-        let size = b - t;
-        if size <= 0 { return Empty }
-        if size % (*a).size() == 0 {
-            if a == old && t == self.top.load(SeqCst) {
-                return Empty
-            }
-            return Abort
-        }
-        let data = (*a).get(t);
-        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
-            Data(data)
-        } else {
-            forget(data); // someone else stole this value
-            Abort
-        }
-    }
-
-    unsafe fn maybe_shrink(&self, b: int, t: int) {
-        let a = self.array.load(SeqCst);
-        if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
-            self.swap_buffer(b, a, (*a).resize(b, t, -1));
-        }
-    }
-
-    // Helper routine not mentioned in the paper which is used in growing and
-    // shrinking buffers to swap in a new buffer into place. As a bit of a
-    // recap, the whole point that we need a buffer pool rather than just
-    // calling malloc/free directly is that stealers can continue using buffers
-    // after this method has called 'free' on it. The continued usage is simply
-    // a read followed by a forget, but we must make sure that the memory can
-    // continue to be read after we flag this buffer for reclamation.
-    unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
-                          buf: Buffer<T>) -> *mut Buffer<T> {
-        let newbuf: *mut Buffer<T> = transmute(box buf);
-        self.array.store(newbuf, SeqCst);
-        let ss = (*newbuf).size();
-        self.bottom.store(b + ss, SeqCst);
-        let t = self.top.load(SeqCst);
-        if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
-            self.bottom.store(b, SeqCst);
-        }
-        self.pool.free(transmute(old));
-        return newbuf;
-    }
-}
-
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Deque<T> {
-    fn drop(&mut self) {
-        let t = self.top.load(SeqCst);
-        let b = self.bottom.load(SeqCst);
-        let a = self.array.load(SeqCst);
-        // Free whatever is leftover in the dequeue, and then move the buffer
-        // back into the pool.
-        for i in range(t, b) {
-            let _: T = unsafe { (*a).get(i) };
-        }
-        self.pool.free(unsafe { transmute(a) });
-    }
-}
-
-#[inline]
-fn buffer_alloc_size<T>(log_size: int) -> uint {
-    (1 << log_size) * size_of::<T>()
-}
-
-impl<T: Send> Buffer<T> {
-    unsafe fn new(log_size: int) -> Buffer<T> {
-        let size = buffer_alloc_size::<T>(log_size);
-        let buffer = allocate(size, min_align_of::<T>());
-        Buffer {
-            storage: buffer as *T,
-            log_size: log_size,
-        }
-    }
-
-    fn size(&self) -> int { 1 << self.log_size }
-
-    // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
-    fn mask(&self) -> int { (1 << self.log_size) - 1 }
-
-    unsafe fn elem(&self, i: int) -> *T { self.storage.offset(i & self.mask()) }
-
-    // This does not protect against loading duplicate values of the same cell,
-    // nor does this clear out the contents contained within. Hence, this is a
-    // very unsafe method which the caller needs to treat specially in case a
-    // race is lost.
-    unsafe fn get(&self, i: int) -> T {
-        ptr::read(self.elem(i))
-    }
-
-    // Unsafe because this unsafely overwrites possibly uninitialized or
-    // initialized data.
-    unsafe fn put(&self, i: int, t: T) {
-        overwrite(self.elem(i) as *mut T, t);
-    }
-
-    // Again, unsafe because this has incredibly dubious ownership violations.
-    // It is assumed that this buffer is immediately dropped.
-    unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
-        let buf = Buffer::new(self.log_size + delta);
-        for i in range(t, b) {
-            buf.put(i, self.get(i));
-        }
-        return buf;
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Buffer<T> {
-    fn drop(&mut self) {
-        // It is assumed that all buffers are empty on drop.
-        let size = buffer_alloc_size::<T>(self.log_size);
-        unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use prelude::*;
-    use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
-
-    use mem;
-    use owned::Box;
-    use rt::thread::Thread;
-    use rand;
-    use rand::Rng;
-    use sync::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
-                        AtomicUint, INIT_ATOMIC_UINT};
-    use vec;
-
-    #[test]
-    fn smoke() {
-        let pool = BufferPool::new();
-        let (w, s) = pool.deque();
-        assert_eq!(w.pop(), None);
-        assert_eq!(s.steal(), Empty);
-        w.push(1);
-        assert_eq!(w.pop(), Some(1));
-        w.push(1);
-        assert_eq!(s.steal(), Data(1));
-        w.push(1);
-        assert_eq!(s.clone().steal(), Data(1));
-    }
-
-    #[test]
-    fn stealpush() {
-        static AMT: int = 100000;
-        let pool = BufferPool::<int>::new();
-        let (w, s) = pool.deque();
-        let t = Thread::start(proc() {
-            let mut left = AMT;
-            while left > 0 {
-                match s.steal() {
-                    Data(i) => {
-                        assert_eq!(i, 1);
-                        left -= 1;
-                    }
-                    Abort | Empty => {}
-                }
-            }
-        });
-
-        for _ in range(0, AMT) {
-            w.push(1);
-        }
-
-        t.join();
-    }
-
-    #[test]
-    fn stealpush_large() {
-        static AMT: int = 100000;
-        let pool = BufferPool::<(int, int)>::new();
-        let (w, s) = pool.deque();
-        let t = Thread::start(proc() {
-            let mut left = AMT;
-            while left > 0 {
-                match s.steal() {
-                    Data((1, 10)) => { left -= 1; }
-                    Data(..) => fail!(),
-                    Abort | Empty => {}
-                }
-            }
-        });
-
-        for _ in range(0, AMT) {
-            w.push((1, 10));
-        }
-
-        t.join();
-    }
-
-    fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>,
-                nthreads: int, amt: uint) {
-        for _ in range(0, amt) {
-            w.push(box 20);
-        }
-        let mut remaining = AtomicUint::new(amt);
-        let unsafe_remaining: *mut AtomicUint = &mut remaining;
-
-        let threads = range(0, nthreads).map(|_| {
-            let s = s.clone();
-            Thread::start(proc() {
-                unsafe {
-                    while (*unsafe_remaining).load(SeqCst) > 0 {
-                        match s.steal() {
-                            Data(box 20) => {
-                                (*unsafe_remaining).fetch_sub(1, SeqCst);
-                            }
-                            Data(..) => fail!(),
-                            Abort | Empty => {}
-                        }
-                    }
-                }
-            })
-        }).collect::<Vec<Thread<()>>>();
-
-        while remaining.load(SeqCst) > 0 {
-            match w.pop() {
-                Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
-                Some(..) => fail!(),
-                None => {}
-            }
-        }
-
-        for thread in threads.move_iter() {
-            thread.join();
-        }
-    }
-
-    #[test]
-    fn run_stampede() {
-        let pool = BufferPool::<Box<int>>::new();
-        let (w, s) = pool.deque();
-        stampede(w, s, 8, 10000);
-    }
-
-    #[test]
-    fn many_stampede() {
-        static AMT: uint = 4;
-        let pool = BufferPool::<Box<int>>::new();
-        let threads = range(0, AMT).map(|_| {
-            let (w, s) = pool.deque();
-            Thread::start(proc() {
-                stampede(w, s, 4, 10000);
-            })
-        }).collect::<Vec<Thread<()>>>();
-
-        for thread in threads.move_iter() {
-            thread.join();
-        }
-    }
-
-    #[test]
-    fn stress() {
-        static AMT: int = 100000;
-        static NTHREADS: int = 8;
-        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
-        static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
-        let pool = BufferPool::<int>::new();
-        let (w, s) = pool.deque();
-
-        let threads = range(0, NTHREADS).map(|_| {
-            let s = s.clone();
-            Thread::start(proc() {
-                unsafe {
-                    loop {
-                        match s.steal() {
-                            Data(2) => { HITS.fetch_add(1, SeqCst); }
-                            Data(..) => fail!(),
-                            _ if DONE.load(SeqCst) => break,
-                            _ => {}
-                        }
-                    }
-                }
-            })
-        }).collect::<Vec<Thread<()>>>();
-
-        let mut rng = rand::task_rng();
-        let mut expected = 0;
-        while expected < AMT {
-            if rng.gen_range(0, 3) == 2 {
-                match w.pop() {
-                    None => {}
-                    Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
-                    Some(_) => fail!(),
-                }
-            } else {
-                expected += 1;
-                w.push(2);
-            }
-        }
-
-        unsafe {
-            while HITS.load(SeqCst) < AMT as uint {
-                match w.pop() {
-                    None => {}
-                    Some(2) => { HITS.fetch_add(1, SeqCst); },
-                    Some(_) => fail!(),
-                }
-            }
-            DONE.store(true, SeqCst);
-        }
-
-        for thread in threads.move_iter() {
-            thread.join();
-        }
-
-        assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
-    }
-
-    #[test]
-    #[ignore(cfg(windows))] // apparently windows scheduling is weird?
-    fn no_starvation() {
-        static AMT: int = 10000;
-        static NTHREADS: int = 4;
-        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
-        let pool = BufferPool::<(int, uint)>::new();
-        let (w, s) = pool.deque();
-
-        let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
-            let s = s.clone();
-            let unique_box = box AtomicUint::new(0);
-            let thread_box = unsafe {
-                *mem::transmute::<&Box<AtomicUint>, **mut AtomicUint>(&unique_box)
-            };
-            (Thread::start(proc() {
-                unsafe {
-                    loop {
-                        match s.steal() {
-                            Data((1, 2)) => {
-                                (*thread_box).fetch_add(1, SeqCst);
-                            }
-                            Data(..) => fail!(),
-                            _ if DONE.load(SeqCst) => break,
-                            _ => {}
-                        }
-                    }
-                }
-            }), unique_box)
-        }));
-
-        let mut rng = rand::task_rng();
-        let mut myhit = false;
-        let mut iter = 0;
-        'outer: loop {
-            for _ in range(0, rng.gen_range(0, AMT)) {
-                if !myhit && rng.gen_range(0, 3) == 2 {
-                    match w.pop() {
-                        None => {}
-                        Some((1, 2)) => myhit = true,
-                        Some(_) => fail!(),
-                    }
-                } else {
-                    w.push((1, 2));
-                }
-            }
-            iter += 1;
-
-            debug!("loop iteration {}", iter);
-            for (i, slot) in hits.iter().enumerate() {
-                let amt = slot.load(SeqCst);
-                debug!("thread {}: {}", i, amt);
-                if amt == 0 { continue 'outer; }
-            }
-            if myhit {
-                break
-            }
-        }
-
-        unsafe { DONE.store(true, SeqCst); }
-
-        for thread in threads.move_iter() {
-            thread.join();
-        }
-    }
-}
diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs
new file mode 100644
index 00000000000..bc748324fcd
--- /dev/null
+++ b/src/libstd/sync/future.rs
@@ -0,0 +1,209 @@
+// Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/*!
+ * A type representing values that may be computed concurrently and
+ * operations for working with them.
+ *
+ * # Example
+ *
+ * ```rust
+ * use std::sync::Future;
+ * # fn fib(n: uint) -> uint {42};
+ * # fn make_a_sandwich() {};
+ * let mut delayed_fib = Future::spawn(proc() { fib(5000) });
+ * make_a_sandwich();
+ * println!("fib(5000) = {}", delayed_fib.get())
+ * ```
+ */
+
+#![allow(missing_doc)]
+
+use core::prelude::*;
+use core::mem::replace;
+
+use comm::{Receiver, channel};
+use task::spawn;
+
+/// A type encapsulating the result of a computation which may not be complete
+pub struct Future<A> {
+    state: FutureState<A>,
+}
+
+enum FutureState<A> {
+    Pending(proc():Send -> A),
+    Evaluating,
+    Forced(A)
+}
+
+/// Methods on the `future` type
+impl<A:Clone> Future<A> {
+    pub fn get(&mut self) -> A {
+        //! Get the value of the future.
+        (*(self.get_ref())).clone()
+    }
+}
+
+impl<A> Future<A> {
+    /// Gets the value from this future, forcing evaluation.
+    pub fn unwrap(mut self) -> A {
+        self.get_ref();
+        let state = replace(&mut self.state, Evaluating);
+        match state {
+            Forced(v) => v,
+            _ => fail!( "Logic error." ),
+        }
+    }
+
+    pub fn get_ref<'a>(&'a mut self) -> &'a A {
+        /*!
+        * Executes the future's closure and then returns a reference
+        * to the result.  The reference lasts as long as
+        * the future.
+        */
+        match self.state {
+            Forced(ref v) => return v,
+            Evaluating => fail!("Recursive forcing of future!"),
+            Pending(_) => {
+                match replace(&mut self.state, Evaluating) {
+                    Forced(_) | Evaluating => fail!("Logic error."),
+                    Pending(f) => {
+                        self.state = Forced(f());
+                        self.get_ref()
+                    }
+                }
+            }
+        }
+    }
+
+    pub fn from_value(val: A) -> Future<A> {
+        /*!
+         * Create a future from a value.
+         *
+         * The value is immediately available and calling `get` later will
+         * not block.
+         */
+
+        Future {state: Forced(val)}
+    }
+
+    pub fn from_fn(f: proc():Send -> A) -> Future<A> {
+        /*!
+         * Create a future from a function.
+         *
+         * The first time that the value is requested it will be retrieved by
+         * calling the function.  Note that this function is a local
+         * function. It is not spawned into another task.
+         */
+
+        Future {state: Pending(f)}
+    }
+}
+
+impl<A:Send> Future<A> {
+    pub fn from_receiver(rx: Receiver<A>) -> Future<A> {
+        /*!
+         * Create a future from a port
+         *
+         * The first time that the value is requested the task will block
+         * waiting for the result to be received on the port.
+         */
+
+        Future::from_fn(proc() {
+            rx.recv()
+        })
+    }
+
+    pub fn spawn(blk: proc():Send -> A) -> Future<A> {
+        /*!
+         * Create a future from a unique closure.
+         *
+         * The closure will be run in a new task and its result used as the
+         * value of the future.
+         */
+
+        let (tx, rx) = channel();
+
+        spawn(proc() {
+            tx.send(blk());
+        });
+
+        Future::from_receiver(rx)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use prelude::*;
+    use sync::Future;
+    use task;
+
+    #[test]
+    fn test_from_value() {
+        let mut f = Future::from_value("snail".to_string());
+        assert_eq!(f.get(), "snail".to_string());
+    }
+
+    #[test]
+    fn test_from_receiver() {
+        let (tx, rx) = channel();
+        tx.send("whale".to_string());
+        let mut f = Future::from_receiver(rx);
+        assert_eq!(f.get(), "whale".to_string());
+    }
+
+    #[test]
+    fn test_from_fn() {
+        let mut f = Future::from_fn(proc() "brail".to_string());
+        assert_eq!(f.get(), "brail".to_string());
+    }
+
+    #[test]
+    fn test_interface_get() {
+        let mut f = Future::from_value("fail".to_string());
+        assert_eq!(f.get(), "fail".to_string());
+    }
+
+    #[test]
+    fn test_interface_unwrap() {
+        let f = Future::from_value("fail".to_string());
+        assert_eq!(f.unwrap(), "fail".to_string());
+    }
+
+    #[test]
+    fn test_get_ref_method() {
+        let mut f = Future::from_value(22);
+        assert_eq!(*f.get_ref(), 22);
+    }
+
+    #[test]
+    fn test_spawn() {
+        let mut f = Future::spawn(proc() "bale".to_string());
+        assert_eq!(f.get(), "bale".to_string());
+    }
+
+    #[test]
+    #[should_fail]
+    fn test_futurefail() {
+        let mut f = Future::spawn(proc() fail!());
+        let _x: String = f.get();
+    }
+
+    #[test]
+    fn test_sendable_future() {
+        let expected = "schlorf";
+        let f = Future::spawn(proc() { expected });
+        task::spawn(proc() {
+            let mut f = f;
+            let actual = f.get();
+            assert_eq!(actual, expected);
+        });
+    }
+}
diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs
index b2cf427edc8..5f45ce25502 100644
--- a/src/libstd/sync/mod.rs
+++ b/src/libstd/sync/mod.rs
@@ -15,8 +15,14 @@
 //! and/or blocking at all, but rather provide the necessary tools to build
 //! other types of concurrent primitives.
 
-pub mod atomics;
-pub mod deque;
-pub mod mpmc_bounded_queue;
-pub mod mpsc_queue;
-pub mod spsc_queue;
+pub use core_sync::{atomics, deque, mpmc_bounded_queue, mpsc_queue, spsc_queue};
+pub use core_sync::{Arc, Weak, Mutex, MutexGuard, Condvar, Barrier};
+pub use core_sync::{RWLock, RWLockReadGuard, RWLockWriteGuard};
+pub use core_sync::{Semaphore, SemaphoreGuard};
+pub use core_sync::one::{Once, ONCE_INIT};
+
+pub use self::future::Future;
+pub use self::task_pool::TaskPool;
+
+mod future;
+mod task_pool;
diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs
deleted file mode 100644
index ffad9c1c583..00000000000
--- a/src/libstd/sync/mpmc_bounded_queue.rs
+++ /dev/null
@@ -1,220 +0,0 @@
-/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *    1. Redistributions of source code must retain the above copyright notice,
- *       this list of conditions and the following disclaimer.
- *
- *    2. Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in the
- *       documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
- * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and documentation are
- * those of the authors and should not be interpreted as representing official
- * policies, either expressed or implied, of Dmitry Vyukov.
- */
-
-#![allow(missing_doc, dead_code)]
-
-// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-
-use alloc::arc::Arc;
-
-use clone::Clone;
-use kinds::Send;
-use num::next_power_of_two;
-use option::{Option, Some, None};
-use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
-use vec::Vec;
-use ty::Unsafe;
-
-struct Node<T> {
-    sequence: AtomicUint,
-    value: Option<T>,
-}
-
-struct State<T> {
-    pad0: [u8, ..64],
-    buffer: Vec<Unsafe<Node<T>>>,
-    mask: uint,
-    pad1: [u8, ..64],
-    enqueue_pos: AtomicUint,
-    pad2: [u8, ..64],
-    dequeue_pos: AtomicUint,
-    pad3: [u8, ..64],
-}
-
-pub struct Queue<T> {
-    state: Arc<State<T>>,
-}
-
-impl<T: Send> State<T> {
-    fn with_capacity(capacity: uint) -> State<T> {
-        let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
-            if capacity < 2 {
-                2u
-            } else {
-                // use next power of 2 as capacity
-                next_power_of_two(capacity)
-            }
-        } else {
-            capacity
-        };
-        let buffer = Vec::from_fn(capacity, |i| {
-            Unsafe::new(Node { sequence:AtomicUint::new(i), value: None })
-        });
-        State{
-            pad0: [0, ..64],
-            buffer: buffer,
-            mask: capacity-1,
-            pad1: [0, ..64],
-            enqueue_pos: AtomicUint::new(0),
-            pad2: [0, ..64],
-            dequeue_pos: AtomicUint::new(0),
-            pad3: [0, ..64],
-        }
-    }
-
-    fn push(&self, value: T) -> bool {
-        let mask = self.mask;
-        let mut pos = self.enqueue_pos.load(Relaxed);
-        loop {
-            let node = self.buffer.get(pos & mask);
-            let seq = unsafe { (*node.get()).sequence.load(Acquire) };
-            let diff: int = seq as int - pos as int;
-
-            if diff == 0 {
-                let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
-                if enqueue_pos == pos {
-                    unsafe {
-                        (*node.get()).value = Some(value);
-                        (*node.get()).sequence.store(pos+1, Release);
-                    }
-                    break
-                } else {
-                    pos = enqueue_pos;
-                }
-            } else if diff < 0 {
-                return false
-            } else {
-                pos = self.enqueue_pos.load(Relaxed);
-            }
-        }
-        true
-    }
-
-    fn pop(&self) -> Option<T> {
-        let mask = self.mask;
-        let mut pos = self.dequeue_pos.load(Relaxed);
-        loop {
-            let node = self.buffer.get(pos & mask);
-            let seq = unsafe { (*node.get()).sequence.load(Acquire) };
-            let diff: int = seq as int - (pos + 1) as int;
-            if diff == 0 {
-                let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
-                if dequeue_pos == pos {
-                    unsafe {
-                        let value = (*node.get()).value.take();
-                        (*node.get()).sequence.store(pos + mask + 1, Release);
-                        return value
-                    }
-                } else {
-                    pos = dequeue_pos;
-                }
-            } else if diff < 0 {
-                return None
-            } else {
-                pos = self.dequeue_pos.load(Relaxed);
-            }
-        }
-    }
-}
-
-impl<T: Send> Queue<T> {
-    pub fn with_capacity(capacity: uint) -> Queue<T> {
-        Queue{
-            state: Arc::new(State::with_capacity(capacity))
-        }
-    }
-
-    pub fn push(&self, value: T) -> bool {
-        self.state.push(value)
-    }
-
-    pub fn pop(&self) -> Option<T> {
-        self.state.pop()
-    }
-}
-
-impl<T: Send> Clone for Queue<T> {
-    fn clone(&self) -> Queue<T> {
-        Queue { state: self.state.clone() }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use prelude::*;
-    use super::Queue;
-    use native;
-
-    #[test]
-    fn test() {
-        let nthreads = 8u;
-        let nmsgs = 1000u;
-        let q = Queue::with_capacity(nthreads*nmsgs);
-        assert_eq!(None, q.pop());
-        let (tx, rx) = channel();
-
-        for _ in range(0, nthreads) {
-            let q = q.clone();
-            let tx = tx.clone();
-            native::task::spawn(proc() {
-                let q = q;
-                for i in range(0, nmsgs) {
-                    assert!(q.push(i));
-                }
-                tx.send(());
-            });
-        }
-
-        let mut completion_rxs = vec![];
-        for _ in range(0, nthreads) {
-            let (tx, rx) = channel();
-            completion_rxs.push(rx);
-            let q = q.clone();
-            native::task::spawn(proc() {
-                let q = q;
-                let mut i = 0u;
-                loop {
-                    match q.pop() {
-                        None => {},
-                        Some(_) => {
-                            i += 1;
-                            if i == nmsgs { break }
-                        }
-                    }
-                }
-                tx.send(i);
-            });
-        }
-
-        for rx in completion_rxs.mut_iter() {
-            assert_eq!(nmsgs, rx.recv());
-        }
-        for _ in range(0, nthreads) {
-            rx.recv();
-        }
-    }
-}
diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs
deleted file mode 100644
index 4db24e82d37..00000000000
--- a/src/libstd/sync/mpsc_queue.rs
+++ /dev/null
@@ -1,208 +0,0 @@
-/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *    1. Redistributions of source code must retain the above copyright notice,
- *       this list of conditions and the following disclaimer.
- *
- *    2. Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in the
- *       documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
- * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and documentation are
- * those of the authors and should not be interpreted as representing official
- * policies, either expressed or implied, of Dmitry Vyukov.
- */
-
-//! A mostly lock-free multi-producer, single consumer queue.
-//!
-//! This module contains an implementation of a concurrent MPSC queue. This
-//! queue can be used to share data between tasks, and is also used as the
-//! building block of channels in rust.
-//!
-//! Note that the current implementation of this queue has a caveat of the `pop`
-//! method, and see the method for more information about it. Due to this
-//! caveat, this queue may not be appropriate for all use-cases.
-
-// http://www.1024cores.net/home/lock-free-algorithms
-//                         /queues/non-intrusive-mpsc-node-based-queue
-
-use kinds::Send;
-use mem;
-use ops::Drop;
-use option::{Option, None, Some};
-use owned::Box;
-use ptr::RawPtr;
-use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
-use ty::Unsafe;
-
-/// A result of the `pop` function.
-pub enum PopResult<T> {
-    /// Some data has been popped
-    Data(T),
-    /// The queue is empty
-    Empty,
-    /// The queue is in an inconsistent state. Popping data should succeed, but
-    /// some pushers have yet to make enough progress in order allow a pop to
-    /// succeed. It is recommended that a pop() occur "in the near future" in
-    /// order to see if the sender has made progress or not
-    Inconsistent,
-}
-
-struct Node<T> {
-    next: AtomicPtr<Node<T>>,
-    value: Option<T>,
-}
-
-/// The multi-producer single-consumer structure. This is not cloneable, but it
-/// may be safely shared so long as it is guaranteed that there is only one
-/// popper at a time (many pushers are allowed).
-pub struct Queue<T> {
-    head: AtomicPtr<Node<T>>,
-    tail: Unsafe<*mut Node<T>>,
-}
-
-impl<T> Node<T> {
-    unsafe fn new(v: Option<T>) -> *mut Node<T> {
-        mem::transmute(box Node {
-            next: AtomicPtr::new(0 as *mut Node<T>),
-            value: v,
-        })
-    }
-}
-
-impl<T: Send> Queue<T> {
-    /// Creates a new queue that is safe to share among multiple producers and
-    /// one consumer.
-    pub fn new() -> Queue<T> {
-        let stub = unsafe { Node::new(None) };
-        Queue {
-            head: AtomicPtr::new(stub),
-            tail: Unsafe::new(stub),
-        }
-    }
-
-    /// Pushes a new value onto this queue.
-    pub fn push(&self, t: T) {
-        unsafe {
-            let n = Node::new(Some(t));
-            let prev = self.head.swap(n, AcqRel);
-            (*prev).next.store(n, Release);
-        }
-    }
-
-    /// Pops some data from this queue.
-    ///
-    /// Note that the current implementation means that this function cannot
-    /// return `Option<T>`. It is possible for this queue to be in an
-    /// inconsistent state where many pushes have succeeded and completely
-    /// finished, but pops cannot return `Some(t)`. This inconsistent state
-    /// happens when a pusher is pre-empted at an inopportune moment.
-    ///
-    /// This inconsistent state means that this queue does indeed have data, but
-    /// it does not currently have access to it at this time.
-    pub fn pop(&self) -> PopResult<T> {
-        unsafe {
-            let tail = *self.tail.get();
-            let next = (*tail).next.load(Acquire);
-
-            if !next.is_null() {
-                *self.tail.get() = next;
-                assert!((*tail).value.is_none());
-                assert!((*next).value.is_some());
-                let ret = (*next).value.take_unwrap();
-                let _: Box<Node<T>> = mem::transmute(tail);
-                return Data(ret);
-            }
-
-            if self.head.load(Acquire) == tail {Empty} else {Inconsistent}
-        }
-    }
-
-    /// Attempts to pop data from this queue, but doesn't attempt too hard. This
-    /// will canonicalize inconsistent states to a `None` value.
-    pub fn casual_pop(&self) -> Option<T> {
-        match self.pop() {
-            Data(t) => Some(t),
-            Empty | Inconsistent => None,
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Queue<T> {
-    fn drop(&mut self) {
-        unsafe {
-            let mut cur = *self.tail.get();
-            while !cur.is_null() {
-                let next = (*cur).next.load(Relaxed);
-                let _: Box<Node<T>> = mem::transmute(cur);
-                cur = next;
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use prelude::*;
-
-    use alloc::arc::Arc;
-
-    use native;
-    use super::{Queue, Data, Empty, Inconsistent};
-
-    #[test]
-    fn test_full() {
-        let q = Queue::new();
-        q.push(box 1);
-        q.push(box 2);
-    }
-
-    #[test]
-    fn test() {
-        let nthreads = 8u;
-        let nmsgs = 1000u;
-        let q = Queue::new();
-        match q.pop() {
-            Empty => {}
-            Inconsistent | Data(..) => fail!()
-        }
-        let (tx, rx) = channel();
-        let q = Arc::new(q);
-
-        for _ in range(0, nthreads) {
-            let tx = tx.clone();
-            let q = q.clone();
-            native::task::spawn(proc() {
-                for i in range(0, nmsgs) {
-                    q.push(i);
-                }
-                tx.send(());
-            });
-        }
-
-        let mut i = 0u;
-        while i < nthreads * nmsgs {
-            match q.pop() {
-                Empty | Inconsistent => {},
-                Data(_) => { i += 1 }
-            }
-        }
-        drop(tx);
-        for _ in range(0, nthreads) {
-            rx.recv();
-        }
-    }
-}
diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs
deleted file mode 100644
index fb515c9db6e..00000000000
--- a/src/libstd/sync/spsc_queue.rs
+++ /dev/null
@@ -1,300 +0,0 @@
-/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- *    1. Redistributions of source code must retain the above copyright notice,
- *       this list of conditions and the following disclaimer.
- *
- *    2. Redistributions in binary form must reproduce the above copyright
- *       notice, this list of conditions and the following disclaimer in the
- *       documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
- * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and documentation are
- * those of the authors and should not be interpreted as representing official
- * policies, either expressed or implied, of Dmitry Vyukov.
- */
-
-// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-
-//! A single-producer single-consumer concurrent queue
-//!
-//! This module contains the implementation of an SPSC queue which can be used
-//! concurrently between two tasks. This data structure is safe to use and
-//! enforces the semantics that there is one pusher and one popper.
-
-use kinds::Send;
-use mem;
-use ops::Drop;
-use option::{Some, None, Option};
-use owned::Box;
-use ptr::RawPtr;
-use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
-use ty::Unsafe;
-
-// Node within the linked list queue of messages to send
-struct Node<T> {
-    // FIXME: this could be an uninitialized T if we're careful enough, and
-    //      that would reduce memory usage (and be a bit faster).
-    //      is it worth it?
-    value: Option<T>,           // nullable for re-use of nodes
-    next: AtomicPtr<Node<T>>,   // next node in the queue
-}
-
-/// The single-producer single-consumer queue. This structure is not cloneable,
-/// but it can be safely shared in an Arc if it is guaranteed that there
-/// is only one popper and one pusher touching the queue at any one point in
-/// time.
-pub struct Queue<T> {
-    // consumer fields
-    tail: Unsafe<*mut Node<T>>, // where to pop from
-    tail_prev: AtomicPtr<Node<T>>, // where to pop from
-
-    // producer fields
-    head: Unsafe<*mut Node<T>>,      // where to push to
-    first: Unsafe<*mut Node<T>>,     // where to get new nodes from
-    tail_copy: Unsafe<*mut Node<T>>, // between first/tail
-
-    // Cache maintenance fields. Additions and subtractions are stored
-    // separately in order to allow them to use nonatomic addition/subtraction.
-    cache_bound: uint,
-    cache_additions: AtomicUint,
-    cache_subtractions: AtomicUint,
-}
-
-impl<T: Send> Node<T> {
-    fn new() -> *mut Node<T> {
-        unsafe {
-            mem::transmute(box Node {
-                value: None,
-                next: AtomicPtr::new(0 as *mut Node<T>),
-            })
-        }
-    }
-}
-
-impl<T: Send> Queue<T> {
-    /// Creates a new queue. The producer returned is connected to the consumer
-    /// to push all data to the consumer.
-    ///
-    /// # Arguments
-    ///
-    ///   * `bound` - This queue implementation is implemented with a linked
-    ///               list, and this means that a push is always a malloc. In
-    ///               order to amortize this cost, an internal cache of nodes is
-    ///               maintained to prevent a malloc from always being
-    ///               necessary. This bound is the limit on the size of the
-    ///               cache (if desired). If the value is 0, then the cache has
-    ///               no bound. Otherwise, the cache will never grow larger than
-    ///               `bound` (although the queue itself could be much larger.
-    pub fn new(bound: uint) -> Queue<T> {
-        let n1 = Node::new();
-        let n2 = Node::new();
-        unsafe { (*n1).next.store(n2, Relaxed) }
-        Queue {
-            tail: Unsafe::new(n2),
-            tail_prev: AtomicPtr::new(n1),
-            head: Unsafe::new(n2),
-            first: Unsafe::new(n1),
-            tail_copy: Unsafe::new(n1),
-            cache_bound: bound,
-            cache_additions: AtomicUint::new(0),
-            cache_subtractions: AtomicUint::new(0),
-        }
-    }
-
-    /// Pushes a new value onto this queue. Note that to use this function
-    /// safely, it must be externally guaranteed that there is only one pusher.
-    pub fn push(&self, t: T) {
-        unsafe {
-            // Acquire a node (which either uses a cached one or allocates a new
-            // one), and then append this to the 'head' node.
-            let n = self.alloc();
-            assert!((*n).value.is_none());
-            (*n).value = Some(t);
-            (*n).next.store(0 as *mut Node<T>, Relaxed);
-            (**self.head.get()).next.store(n, Release);
-            *self.head.get() = n;
-        }
-    }
-
-    unsafe fn alloc(&self) -> *mut Node<T> {
-        // First try to see if we can consume the 'first' node for our uses.
-        // We try to avoid as many atomic instructions as possible here, so
-        // the addition to cache_subtractions is not atomic (plus we're the
-        // only one subtracting from the cache).
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Relaxed);
-                self.cache_subtractions.store(b + 1, Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Relaxed);
-            return ret;
-        }
-        // If the above fails, then update our copy of the tail and try
-        // again.
-        *self.tail_copy.get() = self.tail_prev.load(Acquire);
-        if *self.first.get() != *self.tail_copy.get() {
-            if self.cache_bound > 0 {
-                let b = self.cache_subtractions.load(Relaxed);
-                self.cache_subtractions.store(b + 1, Relaxed);
-            }
-            let ret = *self.first.get();
-            *self.first.get() = (*ret).next.load(Relaxed);
-            return ret;
-        }
-        // If all of that fails, then we have to allocate a new node
-        // (there's nothing in the node cache).
-        Node::new()
-    }
-
-    /// Attempts to pop a value from this queue. Remember that to use this type
-    /// safely you must ensure that there is only one popper at a time.
-    pub fn pop(&self) -> Option<T> {
-        unsafe {
-            // The `tail` node is not actually a used node, but rather a
-            // sentinel from where we should start popping from. Hence, look at
-            // tail's next field and see if we can use it. If we do a pop, then
-            // the current tail node is a candidate for going into the cache.
-            let tail = *self.tail.get();
-            let next = (*tail).next.load(Acquire);
-            if next.is_null() { return None }
-            assert!((*next).value.is_some());
-            let ret = (*next).value.take();
-
-            *self.tail.get() = next;
-            if self.cache_bound == 0 {
-                self.tail_prev.store(tail, Release);
-            } else {
-                // FIXME: this is dubious with overflow.
-                let additions = self.cache_additions.load(Relaxed);
-                let subtractions = self.cache_subtractions.load(Relaxed);
-                let size = additions - subtractions;
-
-                if size < self.cache_bound {
-                    self.tail_prev.store(tail, Release);
-                    self.cache_additions.store(additions + 1, Relaxed);
-                } else {
-                    (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
-                    // We have successfully erased all references to 'tail', so
-                    // now we can safely drop it.
-                    let _: Box<Node<T>> = mem::transmute(tail);
-                }
-            }
-            return ret;
-        }
-    }
-
-    /// Attempts to peek at the head of the queue, returning `None` if the queue
-    /// has no data currently
-    pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
-        // This is essentially the same as above with all the popping bits
-        // stripped out.
-        unsafe {
-            let tail = *self.tail.get();
-            let next = (*tail).next.load(Acquire);
-            if next.is_null() { return None }
-            return (*next).value.as_mut();
-        }
-    }
-}
-
-#[unsafe_destructor]
-impl<T: Send> Drop for Queue<T> {
-    fn drop(&mut self) {
-        unsafe {
-            let mut cur = *self.first.get();
-            while !cur.is_null() {
-                let next = (*cur).next.load(Relaxed);
-                let _n: Box<Node<T>> = mem::transmute(cur);
-                cur = next;
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use prelude::*;
-
-    use alloc::arc::Arc;
-    use native;
-
-    use super::Queue;
-
-    #[test]
-    fn smoke() {
-        let q = Queue::new(0);
-        q.push(1);
-        q.push(2);
-        assert_eq!(q.pop(), Some(1));
-        assert_eq!(q.pop(), Some(2));
-        assert_eq!(q.pop(), None);
-        q.push(3);
-        q.push(4);
-        assert_eq!(q.pop(), Some(3));
-        assert_eq!(q.pop(), Some(4));
-        assert_eq!(q.pop(), None);
-    }
-
-    #[test]
-    fn drop_full() {
-        let q = Queue::new(0);
-        q.push(box 1);
-        q.push(box 2);
-    }
-
-    #[test]
-    fn smoke_bound() {
-        let q = Queue::new(1);
-        q.push(1);
-        q.push(2);
-        assert_eq!(q.pop(), Some(1));
-        assert_eq!(q.pop(), Some(2));
-        assert_eq!(q.pop(), None);
-        q.push(3);
-        q.push(4);
-        assert_eq!(q.pop(), Some(3));
-        assert_eq!(q.pop(), Some(4));
-        assert_eq!(q.pop(), None);
-    }
-
-    #[test]
-    fn stress() {
-        stress_bound(0);
-        stress_bound(1);
-
-        fn stress_bound(bound: uint) {
-            let a = Arc::new(Queue::new(bound));
-            let b = a.clone();
-            let (tx, rx) = channel();
-            native::task::spawn(proc() {
-                for _ in range(0, 100000) {
-                    loop {
-                        match b.pop() {
-                            Some(1) => break,
-                            Some(_) => fail!(),
-                            None => {}
-                        }
-                    }
-                }
-                tx.send(());
-            });
-            for _ in range(0, 100000) {
-                a.push(1);
-            }
-            rx.recv();
-        }
-    }
-}
diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs
new file mode 100644
index 00000000000..7667badf0e7
--- /dev/null
+++ b/src/libstd/sync/task_pool.rs
@@ -0,0 +1,98 @@
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+#![allow(missing_doc)]
+
+/// A task pool abstraction. Useful for achieving predictable CPU
+/// parallelism.
+
+use core::prelude::*;
+
+use task;
+use task::spawn;
+use vec::Vec;
+use comm::{channel, Sender};
+
+enum Msg<T> {
+    Execute(proc(&T):Send),
+    Quit
+}
+
+pub struct TaskPool<T> {
+    channels: Vec<Sender<Msg<T>>>,
+    next_index: uint,
+}
+
+#[unsafe_destructor]
+impl<T> Drop for TaskPool<T> {
+    fn drop(&mut self) {
+        for channel in self.channels.mut_iter() {
+            channel.send(Quit);
+        }
+    }
+}
+
+impl<T> TaskPool<T> {
+    /// Spawns a new task pool with `n_tasks` tasks. If the `sched_mode`
+    /// is None, the tasks run on this scheduler; otherwise, they run on a
+    /// new scheduler with the given mode. The provided `init_fn_factory`
+    /// returns a function which, given the index of the task, should return
+    /// local data to be kept around in that task.
+    pub fn new(n_tasks: uint,
+               init_fn_factory: || -> proc(uint):Send -> T)
+               -> TaskPool<T> {
+        assert!(n_tasks >= 1);
+
+        let channels = Vec::from_fn(n_tasks, |i| {
+            let (tx, rx) = channel::<Msg<T>>();
+            let init_fn = init_fn_factory();
+
+            let task_body = proc() {
+                let local_data = init_fn(i);
+                loop {
+                    match rx.recv() {
+                        Execute(f) => f(&local_data),
+                        Quit => break
+                    }
+                }
+            };
+
+            // Run on this scheduler.
+            task::spawn(task_body);
+
+            tx
+        });
+
+        return TaskPool {
+            channels: channels,
+            next_index: 0,
+        };
+    }
+
+    /// Executes the function `f` on a task in the pool. The function
+    /// receives a reference to the local data returned by the `init_fn`.
+    pub fn execute(&mut self, f: proc(&T):Send) {
+        self.channels.get(self.next_index).send(Execute(f));
+        self.next_index += 1;
+        if self.next_index == self.channels.len() { self.next_index = 0; }
+    }
+}
+
+#[test]
+fn test_task_pool() {
+    let f: || -> proc(uint):Send -> uint = || {
+        let g: proc(uint):Send -> uint = proc(i) i;
+        g
+    };
+    let mut pool = TaskPool::new(4, f);
+    for _ in range(0, 8) {
+        pool.execute(proc(i) println!("Hello from thread {}!", *i));
+    }
+}