about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/librustuv/net.rs13
-rw-r--r--src/libstd/rt/deque.rs658
-rw-r--r--src/libstd/rt/mod.rs23
-rw-r--r--src/libstd/rt/sched.rs56
-rw-r--r--src/libstd/rt/test.rs34
-rw-r--r--src/libstd/rt/work_queue.rs75
-rw-r--r--src/libstd/task/spawn.rs5
7 files changed, 723 insertions, 141 deletions
diff --git a/src/librustuv/net.rs b/src/librustuv/net.rs
index 3a485bf2ea9..a7feb6db923 100644
--- a/src/librustuv/net.rs
+++ b/src/librustuv/net.rs
@@ -1076,21 +1076,22 @@ mod test {
         use std::rt::task::Task;
         use std::rt::task::UnwindResult;
         use std::rt::thread::Thread;
-        use std::rt::work_queue::WorkQueue;
+        use std::rt::deque::BufferPool;
         use std::unstable::run_in_bare_thread;
         use uvio::UvEventLoop;
 
         do run_in_bare_thread {
             let sleepers = SleeperList::new();
-            let work_queue1 = WorkQueue::new();
-            let work_queue2 = WorkQueue::new();
-            let queues = ~[work_queue1.clone(), work_queue2.clone()];
+            let mut pool = BufferPool::init();
+            let (worker1, stealer1) = pool.deque();
+            let (worker2, stealer2) = pool.deque();
+            let queues = ~[stealer1, stealer2];
 
             let loop1 = ~UvEventLoop::new() as ~EventLoop;
-            let mut sched1 = ~Scheduler::new(loop1, work_queue1, queues.clone(),
+            let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
                                              sleepers.clone());
             let loop2 = ~UvEventLoop::new() as ~EventLoop;
-            let mut sched2 = ~Scheduler::new(loop2, work_queue2, queues.clone(),
+            let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
                                              sleepers.clone());
 
             let handle1 = Cell::new(sched1.make_handle());
diff --git a/src/libstd/rt/deque.rs b/src/libstd/rt/deque.rs
new file mode 100644
index 00000000000..94d4523b2e2
--- /dev/null
+++ b/src/libstd/rt/deque.rs
@@ -0,0 +1,658 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A (mostly) lock-free concurrent work-stealing deque
+//!
+//! This module contains an implementation of the Chase-Lev work stealing deque
+//! described in "Dynamic Circular Work-Stealing Deque". The implementation is
+//! heavily based on the pseudocode found in the paper.
+//!
+//! This implementation does not want to have the restriction of a garbage
+//! collector for reclamation of buffers, and instead it uses a shared pool of
+//! buffers. This shared pool is required for correctness in this
+//! implementation.
+//!
+//! The only lock-synchronized portions of this deque are the buffer allocation
+//! and deallocation portions. Otherwise all operations are lock-free.
+//!
+//! # Example
+//!
+//!     use std::rt::deque::BufferPool;
+//!
+//!     let mut pool = BufferPool::init();
+//!     let (mut worker, mut stealer) = pool.deque();
+//!
+//!     // Only the worker may push/pop
+//!     worker.push(1);
+//!     worker.pop();
+//!
+//!     // Stealers take data from the other end of the deque
+//!     worker.push(1);
+//!     stealer.steal();
+//!
+//!     // Stealers can be cloned to have many stealers stealing in parallel
+//!     worker.push(1);
+//!     let mut stealer2 = stealer.clone();
+//!     stealer2.steal();
+
+// NB: the "buffer pool" strategy is not done for speed, but rather for
+//     correctness. For more info, see the comment on `swap_buffer`
+
+// XXX: all atomic operations in this module use a SeqCst ordering. That is
+//      probably overkill
+
+use cast;
+use clone::Clone;
+use iter::range;
+use kinds::Send;
+use libc;
+use mem;
+use ops::Drop;
+use option::{Option, Some, None};
+use ptr;
+use unstable::atomics::{AtomicInt, AtomicPtr, SeqCst};
+use unstable::sync::{UnsafeArc, Exclusive};
+
+// Once the queue is less than 1/K full, then it will be downsized. Note that
+// the deque requires that this number be less than 2.
+static K: int = 4;
+
+// Minimum number of bits that a buffer size should be. No buffer will resize to
+// under this value, and all deques will initially contain a buffer of this
+// size.
+//
+// The size in question is 1 << MIN_BITS
+static MIN_BITS: int = 7;
+
+struct Deque<T> {
+    bottom: AtomicInt,
+    top: AtomicInt,
+    array: AtomicPtr<Buffer<T>>,
+    pool: BufferPool<T>,
+}
+
+/// Worker half of the work-stealing deque. This worker has exclusive access to
+/// one side of the deque, and uses `push` and `pop` method to manipulate it.
+///
+/// There may only be one worker per deque.
+pub struct Worker<T> {
+    priv deque: UnsafeArc<Deque<T>>,
+}
+
+/// The stealing half of the work-stealing deque. Stealers have access to the
+/// opposite end of the deque from the worker, and they only have access to the
+/// `steal` method.
+pub struct Stealer<T> {
+    priv deque: UnsafeArc<Deque<T>>,
+}
+
+/// When stealing some data, this is an enumeration of the possible outcomes.
+#[deriving(Eq)]
+pub enum Stolen<T> {
+    /// The deque was empty at the time of stealing
+    Empty,
+    /// The stealer lost the race for stealing data, and a retry may return more
+    /// data.
+    Abort,
+    /// The stealer has successfully stolen some data.
+    Data(T),
+}
+
+/// The allocation pool for buffers used by work-stealing deques. Right now this
+/// structure is used for reclamation of memory after it is no longer in use by
+/// deques.
+///
+/// This data structure is protected by a mutex, but it is rarely used. Deques
+/// will only use this structure when allocating a new buffer or deallocating a
+/// previous one.
+pub struct BufferPool<T> {
+    priv pool: Exclusive<~[~Buffer<T>]>,
+}
+
+/// An internal buffer used by the chase-lev deque. This structure is actually
+/// implemented as a circular buffer, and is used as the intermediate storage of
+/// the data in the deque.
+///
+/// This type is implemented with *T instead of ~[T] for two reasons:
+///
+///   1. There is nothing safe about using this buffer. This easily allows the
+///      same value to be read twice in to rust, and there is nothing to
+///      prevent this. The usage by the deque must ensure that one of the
+///      values is forgotten. Furthermore, we only ever want to manually run
+///      destructors for values in this buffer (on drop) because the bounds
+///      are defined by the deque it's owned by.
+///
+///   2. We can certainly avoid bounds checks using *T instead of ~[T], although
+///      LLVM is probably pretty good at doing this already.
+struct Buffer<T> {
+    storage: *T,
+    log_size: int,
+}
+
+impl<T: Send> BufferPool<T> {
+    /// Allocates a new buffer pool which in turn can be used to allocate new
+    /// deques.
+    pub fn init() -> BufferPool<T> {
+        BufferPool { pool: Exclusive::new(~[]) }
+    }
+
+    /// Allocates a new work-stealing deque which will send/receiving memory to
+    /// and from this buffer pool.
+    pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
+        let (a, b) = UnsafeArc::new2(Deque::init(self.clone()));
+        (Worker { deque: a }, Stealer { deque: b })
+    }
+
+    fn alloc(&mut self, bits: int) -> ~Buffer<T> {
+        unsafe {
+            self.pool.with(|pool| {
+                match pool.iter().position(|x| x.size() >= (1 << bits)) {
+                    Some(i) => pool.remove(i),
+                    None => ~Buffer::init(bits)
+                }
+            })
+        }
+    }
+
+    fn free(&mut self, buf: ~Buffer<T>) {
+        unsafe {
+            use cell::Cell;
+            let buf = Cell::new(buf);
+            self.pool.with(|pool| {
+                let buf = buf.take();
+                match pool.iter().position(|v| v.size() > buf.size()) {
+                    Some(i) => pool.insert(i, buf),
+                    None => pool.push(buf),
+                }
+            })
+        }
+    }
+}
+
+impl<T: Send> Clone for BufferPool<T> {
+    fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
+}
+
+impl<T: Send> Worker<T> {
+    /// Pushes data onto the front of this work queue.
+    pub fn push(&mut self, t: T) {
+        unsafe { (*self.deque.get()).push(t) }
+    }
+    /// Pops data off the front of the work queue, returning `None` on an empty
+    /// queue.
+    pub fn pop(&mut self) -> Option<T> {
+        unsafe { (*self.deque.get()).pop() }
+    }
+
+    /// Gets access to the buffer pool that this worker is attached to. This can
+    /// be used to create more deques which share the same buffer pool as this
+    /// deque.
+    pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
+        unsafe { &mut (*self.deque.get()).pool }
+    }
+}
+
+impl<T: Send> Stealer<T> {
+    /// Steals work off the end of the queue (opposite of the worker's end)
+    pub fn steal(&mut self) -> Stolen<T> {
+        unsafe { (*self.deque.get()).steal() }
+    }
+
+    /// Gets access to the buffer pool that this stealer is attached to. This
+    /// can be used to create more deques which share the same buffer pool as
+    /// this deque.
+    pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
+        unsafe { &mut (*self.deque.get()).pool }
+    }
+}
+
+impl<T: Send> Clone for Stealer<T> {
+    fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
+}
+
+// Almost all of this code can be found directly in the paper so I'm not
+// personally going to heavily comment what's going on here.
+
+impl<T: Send> Deque<T> {
+    fn init(mut pool: BufferPool<T>) -> Deque<T> {
+        let buf = pool.alloc(MIN_BITS);
+        Deque {
+            bottom: AtomicInt::new(0),
+            top: AtomicInt::new(0),
+            array: AtomicPtr::new(unsafe { cast::transmute(buf) }),
+            pool: pool,
+        }
+    }
+
+    unsafe fn push(&mut self, data: T) {
+        let mut b = self.bottom.load(SeqCst);
+        let t = self.top.load(SeqCst);
+        let mut a = self.array.load(SeqCst);
+        let size = b - t;
+        if size >= (*a).size() - 1 {
+            // You won't find this code in the chase-lev deque paper. This is
+            // alluded to in a small footnote, however. We always free a buffer
+            // when growing in order to prevent leaks.
+            a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
+            b = self.bottom.load(SeqCst);
+        }
+        (*a).put(b, data);
+        self.bottom.store(b + 1, SeqCst);
+    }
+
+    unsafe fn pop(&mut self) -> Option<T> {
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        let b = b - 1;
+        self.bottom.store(b, SeqCst);
+        let t = self.top.load(SeqCst);
+        let size = b - t;
+        if size < 0 {
+            self.bottom.store(t, SeqCst);
+            return None;
+        }
+        let data = (*a).get(b);
+        if size > 0 {
+            self.maybe_shrink(b, t);
+            return Some(data);
+        }
+        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
+            self.bottom.store(t + 1, SeqCst);
+            return Some(data);
+        } else {
+            self.bottom.store(t + 1, SeqCst);
+            cast::forget(data); // someone else stole this value
+            return None;
+        }
+    }
+
+    unsafe fn steal(&mut self) -> Stolen<T> {
+        let t = self.top.load(SeqCst);
+        let old = self.array.load(SeqCst);
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        let size = b - t;
+        if size <= 0 { return Empty }
+        if size % (*a).size() == 0 {
+            if a == old && t == self.top.load(SeqCst) {
+                return Empty
+            }
+            return Abort
+        }
+        let data = (*a).get(t);
+        if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
+            Data(data)
+        } else {
+            cast::forget(data); // someone else stole this value
+            Abort
+        }
+    }
+
+    unsafe fn maybe_shrink(&mut self, b: int, t: int) {
+        let a = self.array.load(SeqCst);
+        if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
+            self.swap_buffer(b, a, (*a).resize(b, t, -1));
+        }
+    }
+
+    // Helper routine not mentioned in the paper which is used in growing and
+    // shrinking buffers to swap in a new buffer into place. As a bit of a
+    // recap, the whole point that we need a buffer pool rather than just
+    // calling malloc/free directly is that stealers can continue using buffers
+    // after this method has called 'free' on it. The continued usage is simply
+    // a read followed by a forget, but we must make sure that the memory can
+    // continue to be read after we flag this buffer for reclamation.
+    unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
+                          buf: Buffer<T>) -> *mut Buffer<T> {
+        let newbuf: *mut Buffer<T> = cast::transmute(~buf);
+        self.array.store(newbuf, SeqCst);
+        let ss = (*newbuf).size();
+        self.bottom.store(b + ss, SeqCst);
+        let t = self.top.load(SeqCst);
+        if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
+            self.bottom.store(b, SeqCst);
+        }
+        self.pool.free(cast::transmute(old));
+        return newbuf;
+    }
+}
+
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Deque<T> {
+    fn drop(&mut self) {
+        let t = self.top.load(SeqCst);
+        let b = self.bottom.load(SeqCst);
+        let a = self.array.load(SeqCst);
+        // Free whatever is leftover in the dequeue, and then move the buffer
+        // back into the pool.
+        for i in range(t, b) {
+            let _: T = unsafe { (*a).get(i) };
+        }
+        self.pool.free(unsafe { cast::transmute(a) });
+    }
+}
+
+impl<T: Send> Buffer<T> {
+    unsafe fn init(log_size: int) -> Buffer<T> {
+        let size = (1 << log_size) * mem::size_of::<T>();
+        let buffer = libc::malloc(size as libc::size_t);
+        assert!(!buffer.is_null());
+        Buffer {
+            storage: buffer as *T,
+            log_size: log_size,
+        }
+    }
+
+    fn size(&self) -> int { 1 << self.log_size }
+
+    // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
+    fn mask(&self) -> int { (1 << self.log_size) - 1 }
+
+    // This does not protect against loading duplicate values of the same cell,
+    // nor does this clear out the contents contained within. Hence, this is a
+    // very unsafe method which the caller needs to treat specially in case a
+    // race is lost.
+    unsafe fn get(&self, i: int) -> T {
+        ptr::read_ptr(self.storage.offset(i & self.mask()))
+    }
+
+    // Unsafe because this unsafely overwrites possibly uninitialized or
+    // initialized data.
+    unsafe fn put(&mut self, i: int, t: T) {
+        let ptr = self.storage.offset(i & self.mask());
+        ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
+        cast::forget(t);
+    }
+
+    // Again, unsafe because this has incredibly dubious ownership violations.
+    // It is assumed that this buffer is immediately dropped.
+    unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
+        let mut buf = Buffer::init(self.log_size + delta);
+        for i in range(t, b) {
+            buf.put(i, self.get(i));
+        }
+        return buf;
+    }
+}
+
+#[unsafe_destructor]
+impl<T: Send> Drop for Buffer<T> {
+    fn drop(&mut self) {
+        // It is assumed that all buffers are empty on drop.
+        unsafe { libc::free(self.storage as *libc::c_void) }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use prelude::*;
+    use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
+
+    use cast;
+    use rt::thread::Thread;
+    use rand;
+    use rand::Rng;
+    use unstable::atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
+                            AtomicUint, INIT_ATOMIC_UINT};
+    use vec;
+
+    #[test]
+    fn smoke() {
+        let mut pool = BufferPool::init();
+        let (mut w, mut s) = pool.deque();
+        assert_eq!(w.pop(), None);
+        assert_eq!(s.steal(), Empty);
+        w.push(1);
+        assert_eq!(w.pop(), Some(1));
+        w.push(1);
+        assert_eq!(s.steal(), Data(1));
+        w.push(1);
+        assert_eq!(s.clone().steal(), Data(1));
+    }
+
+    #[test]
+    fn stealpush() {
+        static AMT: int = 100000;
+        let mut pool = BufferPool::<int>::init();
+        let (mut w, s) = pool.deque();
+        let t = do Thread::start {
+            let mut s = s;
+            let mut left = AMT;
+            while left > 0 {
+                match s.steal() {
+                    Data(i) => {
+                        assert_eq!(i, 1);
+                        left -= 1;
+                    }
+                    Abort | Empty => {}
+                }
+            }
+        };
+
+        for _ in range(0, AMT) {
+            w.push(1);
+        }
+
+        t.join();
+    }
+
+    #[test]
+    fn stealpush_large() {
+        static AMT: int = 100000;
+        let mut pool = BufferPool::<(int, int)>::init();
+        let (mut w, s) = pool.deque();
+        let t = do Thread::start {
+            let mut s = s;
+            let mut left = AMT;
+            while left > 0 {
+                match s.steal() {
+                    Data((1, 10)) => { left -= 1; }
+                    Data(..) => fail!(),
+                    Abort | Empty => {}
+                }
+            }
+        };
+
+        for _ in range(0, AMT) {
+            w.push((1, 10));
+        }
+
+        t.join();
+    }
+
+    fn stampede(mut w: Worker<~int>, s: Stealer<~int>,
+                nthreads: int, amt: uint) {
+        for _ in range(0, amt) {
+            w.push(~20);
+        }
+        let mut remaining = AtomicUint::new(amt);
+        let unsafe_remaining: *mut AtomicUint = &mut remaining;
+
+        let threads = range(0, nthreads).map(|_| {
+            let s = s.clone();
+            do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    while (*unsafe_remaining).load(SeqCst) > 0 {
+                        match s.steal() {
+                            Data(~20) => {
+                                (*unsafe_remaining).fetch_sub(1, SeqCst);
+                            }
+                            Data(..) => fail!(),
+                            Abort | Empty => {}
+                        }
+                    }
+                }
+            }
+        }).to_owned_vec();
+
+        while remaining.load(SeqCst) > 0 {
+            match w.pop() {
+                Some(~20) => { remaining.fetch_sub(1, SeqCst); }
+                Some(..) => fail!(),
+                None => {}
+            }
+        }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+
+    #[test]
+    fn run_stampede() {
+        let mut pool = BufferPool::<~int>::init();
+        let (w, s) = pool.deque();
+        stampede(w, s, 8, 10000);
+    }
+
+    #[test]
+    fn many_stampede() {
+        static AMT: uint = 4;
+        let mut pool = BufferPool::<~int>::init();
+        let threads = range(0, AMT).map(|_| {
+            let (w, s) = pool.deque();
+            do Thread::start {
+                stampede(w, s, 4, 10000);
+            }
+        }).to_owned_vec();
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+
+    #[test]
+    fn stress() {
+        static AMT: int = 100000;
+        static NTHREADS: int = 8;
+        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
+        static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
+        let mut pool = BufferPool::<int>::init();
+        let (mut w, s) = pool.deque();
+
+        let threads = range(0, NTHREADS).map(|_| {
+            let s = s.clone();
+            do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    loop {
+                        match s.steal() {
+                            Data(2) => { HITS.fetch_add(1, SeqCst); }
+                            Data(..) => fail!(),
+                            _ if DONE.load(SeqCst) => break,
+                            _ => {}
+                        }
+                    }
+                }
+            }
+        }).to_owned_vec();
+
+        let mut rng = rand::task_rng();
+        let mut expected = 0;
+        while expected < AMT {
+            if rng.gen_range(0, 3) == 2 {
+                match w.pop() {
+                    None => {}
+                    Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
+                    Some(_) => fail!(),
+                }
+            } else {
+                expected += 1;
+                w.push(2);
+            }
+        }
+
+        unsafe {
+            while HITS.load(SeqCst) < AMT as uint {
+                match w.pop() {
+                    None => {}
+                    Some(2) => { HITS.fetch_add(1, SeqCst); },
+                    Some(_) => fail!(),
+                }
+            }
+            DONE.store(true, SeqCst);
+        }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+
+        assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
+    }
+
+    #[test]
+    fn no_starvation() {
+        static AMT: int = 10000;
+        static NTHREADS: int = 4;
+        static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
+        let mut pool = BufferPool::<(int, uint)>::init();
+        let (mut w, s) = pool.deque();
+
+        let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
+            let s = s.clone();
+            let box = ~AtomicUint::new(0);
+            let thread_box = unsafe {
+                *cast::transmute::<&~AtomicUint, **mut AtomicUint>(&box)
+            };
+            (do Thread::start {
+                unsafe {
+                    let mut s = s;
+                    loop {
+                        match s.steal() {
+                            Data((1, 2)) => {
+                                (*thread_box).fetch_add(1, SeqCst);
+                            }
+                            Data(..) => fail!(),
+                            _ if DONE.load(SeqCst) => break,
+                            _ => {}
+                        }
+                    }
+                }
+            }, box)
+        }));
+
+        let mut rng = rand::task_rng();
+        let mut myhit = false;
+        let mut iter = 0;
+        'outer: loop {
+            for _ in range(0, rng.gen_range(0, AMT)) {
+                if !myhit && rng.gen_range(0, 3) == 2 {
+                    match w.pop() {
+                        None => {}
+                        Some((1, 2)) => myhit = true,
+                        Some(_) => fail!(),
+                    }
+                } else {
+                    w.push((1, 2));
+                }
+            }
+            iter += 1;
+
+            debug!("loop iteration {}", iter);
+            for (i, slot) in hits.iter().enumerate() {
+                let amt = slot.load(SeqCst);
+                debug!("thread {}: {}", i, amt);
+                if amt == 0 { continue 'outer; }
+            }
+            if myhit {
+                break
+            }
+        }
+
+        unsafe { DONE.store(true, SeqCst); }
+
+        for thread in threads.move_iter() {
+            thread.join();
+        }
+    }
+}
+
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 78ec32ead3c..be1de6c5bdb 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -75,7 +75,6 @@ use vec::{OwnedVector, MutableVector, ImmutableVector};
 use vec;
 
 use self::thread::Thread;
-use self::work_queue::WorkQueue;
 
 // the os module needs to reach into this helper, so allow general access
 // through this reexport.
@@ -130,9 +129,6 @@ pub mod rtio;
 /// or task-local storage.
 pub mod local;
 
-/// A parallel work-stealing deque.
-pub mod work_queue;
-
 /// A parallel queue.
 pub mod message_queue;
 
@@ -142,6 +138,9 @@ mod mpsc_queue;
 /// A lock-free multi-producer, multi-consumer bounded queue.
 mod mpmc_bounded_queue;
 
+/// A parallel work-stealing deque
+pub mod deque;
+
 /// A parallel data structure for tracking sleeping schedulers.
 pub mod sleeper_list;
 
@@ -287,7 +286,9 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
 
     // Create a work queue for each scheduler, ntimes. Create an extra
     // for the main thread if that flag is set. We won't steal from it.
-    let work_queues: ~[WorkQueue<~Task>] = vec::from_fn(nscheds, |_| WorkQueue::new());
+    let mut pool = deque::BufferPool::init();
+    let arr = vec::from_fn(nscheds, |_| pool.deque());
+    let (workers, stealers) = vec::unzip(arr.move_iter());
 
     // The schedulers.
     let mut scheds = ~[];
@@ -295,14 +296,14 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
     // sent the Shutdown message to terminate the schedulers.
     let mut handles = ~[];
 
-    for work_queue in work_queues.iter() {
+    for worker in workers.move_iter() {
         rtdebug!("inserting a regular scheduler");
 
         // Every scheduler is driven by an I/O event loop.
         let loop_ = new_event_loop();
         let mut sched = ~Scheduler::new(loop_,
-                                        work_queue.clone(),
-                                        work_queues.clone(),
+                                        worker,
+                                        stealers.clone(),
                                         sleepers.clone());
         let handle = sched.make_handle();
 
@@ -321,12 +322,12 @@ fn run_(main: proc(), use_main_sched: bool) -> int {
 
         // This scheduler needs a queue that isn't part of the stealee
         // set.
-        let work_queue = WorkQueue::new();
+        let (worker, _) = pool.deque();
 
         let main_loop = new_event_loop();
         let mut main_sched = ~Scheduler::new_special(main_loop,
-                                                     work_queue,
-                                                     work_queues.clone(),
+                                                     worker,
+                                                     stealers.clone(),
                                                      sleepers.clone(),
                                                      false,
                                                      Some(friend_handle));
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index d66bd1e4135..a231bea5e27 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -13,13 +13,13 @@ use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
 use clone::Clone;
 use unstable::raw;
 use super::sleeper_list::SleeperList;
-use super::work_queue::WorkQueue;
 use super::stack::{StackPool};
 use super::rtio::EventLoop;
 use super::context::Context;
 use super::task::{Task, AnySched, Sched};
 use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
+use rt::deque;
 use rt::local_ptr;
 use rt::local::Local;
 use rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback};
@@ -39,14 +39,14 @@ use vec::{OwnedVector};
 /// in too much allocation and too many events.
 pub struct Scheduler {
     /// There are N work queues, one per scheduler.
-    priv work_queue: WorkQueue<~Task>,
+    work_queue: deque::Worker<~Task>,
     /// Work queues for the other schedulers. These are created by
     /// cloning the core work queues.
-    work_queues: ~[WorkQueue<~Task>],
+    work_queues: ~[deque::Stealer<~Task>],
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
-    priv message_queue: MessageQueue<SchedMessage>,
+    message_queue: MessageQueue<SchedMessage>,
     /// A shared list of sleeping schedulers. We'll use this to wake
     /// up schedulers when pushing work onto the work queue.
     sleeper_list: SleeperList,
@@ -56,33 +56,33 @@ pub struct Scheduler {
     /// not active since there are multiple event sources that may
     /// wake the scheduler. It just prevents the scheduler from pushing
     /// multiple handles onto the sleeper list.
-    priv sleepy: bool,
+    sleepy: bool,
     /// A flag to indicate we've received the shutdown message and should
     /// no longer try to go to sleep, but exit instead.
     no_sleep: bool,
     stack_pool: StackPool,
     /// The scheduler runs on a special task. When it is not running
     /// it is stored here instead of the work queue.
-    priv sched_task: Option<~Task>,
+    sched_task: Option<~Task>,
     /// An action performed after a context switch on behalf of the
     /// code running before the context switch
-    priv cleanup_job: Option<CleanupJob>,
+    cleanup_job: Option<CleanupJob>,
     /// Should this scheduler run any task, or only pinned tasks?
     run_anything: bool,
     /// If the scheduler shouldn't run some tasks, a friend to send
     /// them to.
-    priv friend_handle: Option<SchedHandle>,
+    friend_handle: Option<SchedHandle>,
     /// A fast XorShift rng for scheduler use
     rng: XorShiftRng,
     /// A toggleable idle callback
-    priv idle_callback: Option<~PausibleIdleCallback>,
+    idle_callback: Option<~PausibleIdleCallback>,
     /// A countdown that starts at a random value and is decremented
     /// every time a yield check is performed. When it hits 0 a task
     /// will yield.
-    priv yield_check_count: uint,
+    yield_check_count: uint,
     /// A flag to tell the scheduler loop it needs to do some stealing
     /// in order to introduce randomness as part of a yield
-    priv steal_for_yield: bool,
+    steal_for_yield: bool,
 
     // n.b. currently destructors of an object are run in top-to-bottom in order
     //      of field declaration. Due to its nature, the pausible idle callback
@@ -115,8 +115,8 @@ impl Scheduler {
     // * Initialization Functions
 
     pub fn new(event_loop: ~EventLoop,
-               work_queue: WorkQueue<~Task>,
-               work_queues: ~[WorkQueue<~Task>],
+               work_queue: deque::Worker<~Task>,
+               work_queues: ~[deque::Stealer<~Task>],
                sleeper_list: SleeperList)
         -> Scheduler {
 
@@ -127,8 +127,8 @@ impl Scheduler {
     }
 
     pub fn new_special(event_loop: ~EventLoop,
-                       work_queue: WorkQueue<~Task>,
-                       work_queues: ~[WorkQueue<~Task>],
+                       work_queue: deque::Worker<~Task>,
+                       work_queues: ~[deque::Stealer<~Task>],
                        sleeper_list: SleeperList,
                        run_anything: bool,
                        friend: Option<SchedHandle>)
@@ -440,11 +440,11 @@ impl Scheduler {
         let start_index = self.rng.gen_range(0, len);
         for index in range(0, len).map(|i| (i + start_index) % len) {
             match work_queues[index].steal() {
-                Some(task) => {
+                deque::Data(task) => {
                     rtdebug!("found task by stealing");
                     return Some(task)
                 }
-                None => ()
+                _ => ()
             }
         };
         rtdebug!("giving up on stealing");
@@ -889,6 +889,7 @@ mod test {
     use borrow::to_uint;
     use rt::sched::{Scheduler};
     use cell::Cell;
+    use rt::deque::BufferPool;
     use rt::thread::Thread;
     use rt::task::{Task, Sched};
     use rt::basic;
@@ -994,7 +995,6 @@ mod test {
     #[test]
     fn test_schedule_home_states() {
         use rt::sleeper_list::SleeperList;
-        use rt::work_queue::WorkQueue;
         use rt::sched::Shutdown;
         use borrow;
         use rt::comm::*;
@@ -1002,14 +1002,15 @@ mod test {
         do run_in_bare_thread {
 
             let sleepers = SleeperList::new();
-            let normal_queue = WorkQueue::new();
-            let special_queue = WorkQueue::new();
-            let queues = ~[normal_queue.clone(), special_queue.clone()];
+            let mut pool = BufferPool::init();
+            let (normal_worker, normal_stealer) = pool.deque();
+            let (special_worker, special_stealer) = pool.deque();
+            let queues = ~[normal_stealer, special_stealer];
 
             // Our normal scheduler
             let mut normal_sched = ~Scheduler::new(
                 basic::event_loop(),
-                normal_queue,
+                normal_worker,
                 queues.clone(),
                 sleepers.clone());
 
@@ -1020,7 +1021,7 @@ mod test {
             // Our special scheduler
             let mut special_sched = ~Scheduler::new_special(
                 basic::event_loop(),
-                special_queue.clone(),
+                special_worker,
                 queues.clone(),
                 sleepers.clone(),
                 false,
@@ -1169,7 +1170,6 @@ mod test {
     // Used to deadlock because Shutdown was never recvd.
     #[test]
     fn no_missed_messages() {
-        use rt::work_queue::WorkQueue;
         use rt::sleeper_list::SleeperList;
         use rt::stack::StackPool;
         use rt::sched::{Shutdown, TaskFromFriend};
@@ -1178,13 +1178,13 @@ mod test {
         do run_in_bare_thread {
             stress_factor().times(|| {
                 let sleepers = SleeperList::new();
-                let queue = WorkQueue::new();
-                let queues = ~[queue.clone()];
+                let mut pool = BufferPool::init();
+                let (worker, stealer) = pool.deque();
 
                 let mut sched = ~Scheduler::new(
                     basic::event_loop(),
-                    queue,
-                    queues.clone(),
+                    worker,
+                    ~[stealer],
                     sleepers.clone());
 
                 let mut handle = sched.make_handle();
diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs
index 2a6d30cf810..51ad37a2583 100644
--- a/src/libstd/rt/test.rs
+++ b/src/libstd/rt/test.rs
@@ -23,24 +23,25 @@ use rand;
 use result::{Result, Ok, Err};
 use rt::basic;
 use rt::comm::oneshot;
+use rt::deque::BufferPool;
 use rt::new_event_loop;
 use rt::sched::Scheduler;
 use rt::sleeper_list::SleeperList;
 use rt::task::Task;
 use rt::task::UnwindResult;
 use rt::thread::Thread;
-use rt::work_queue::WorkQueue;
 use unstable::{run_in_bare_thread};
+use vec;
 use vec::{OwnedVector, MutableVector, ImmutableVector};
 
 pub fn new_test_uv_sched() -> Scheduler {
 
-    let queue = WorkQueue::new();
-    let queues = ~[queue.clone()];
+    let mut pool = BufferPool::init();
+    let (worker, stealer) = pool.deque();
 
     let mut sched = Scheduler::new(new_event_loop(),
-                                   queue,
-                                   queues,
+                                   worker,
+                                   ~[stealer],
                                    SleeperList::new());
 
     // Don't wait for the Shutdown message
@@ -50,13 +51,12 @@ pub fn new_test_uv_sched() -> Scheduler {
 }
 
 pub fn new_test_sched() -> Scheduler {
-
-    let queue = WorkQueue::new();
-    let queues = ~[queue.clone()];
+    let mut pool = BufferPool::init();
+    let (worker, stealer) = pool.deque();
 
     let mut sched = Scheduler::new(basic::event_loop(),
-                                   queue,
-                                   queues,
+                                   worker,
+                                   ~[stealer],
                                    SleeperList::new());
 
     // Don't wait for the Shutdown message
@@ -227,18 +227,16 @@ pub fn run_in_mt_newsched_task(f: proc()) {
 
         let mut handles = ~[];
         let mut scheds = ~[];
-        let mut work_queues = ~[];
 
-        for _ in range(0u, nthreads) {
-            let work_queue = WorkQueue::new();
-            work_queues.push(work_queue);
-        }
+        let mut pool = BufferPool::<~Task>::init();
+        let workers = range(0, nthreads).map(|_| pool.deque());
+        let (workers, stealers) = vec::unzip(workers);
 
-        for i in range(0u, nthreads) {
+        for worker in workers.move_iter() {
             let loop_ = new_event_loop();
             let mut sched = ~Scheduler::new(loop_,
-                                            work_queues[i].clone(),
-                                            work_queues.clone(),
+                                            worker,
+                                            stealers.clone(),
                                             sleepers.clone());
             let handle = sched.make_handle();
 
diff --git a/src/libstd/rt/work_queue.rs b/src/libstd/rt/work_queue.rs
deleted file mode 100644
index 02ea8ab4f50..00000000000
--- a/src/libstd/rt/work_queue.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-use container::Container;
-use option::*;
-use vec::OwnedVector;
-use unstable::sync::Exclusive;
-use cell::Cell;
-use kinds::Send;
-use clone::Clone;
-
-pub struct WorkQueue<T> {
-    // XXX: Another mystery bug fixed by boxing this lock
-    priv queue: ~Exclusive<~[T]>
-}
-
-impl<T: Send> WorkQueue<T> {
-    pub fn new() -> WorkQueue<T> {
-        WorkQueue {
-            queue: ~Exclusive::new(~[])
-        }
-    }
-
-    pub fn push(&mut self, value: T) {
-        unsafe {
-            let value = Cell::new(value);
-            self.queue.with(|q| q.unshift(value.take()) );
-        }
-    }
-
-    pub fn pop(&mut self) -> Option<T> {
-        unsafe {
-            self.queue.with(|q| {
-                if !q.is_empty() {
-                    Some(q.shift())
-                } else {
-                    None
-                }
-            })
-        }
-    }
-
-    pub fn steal(&mut self) -> Option<T> {
-        unsafe {
-            self.queue.with(|q| {
-                if !q.is_empty() {
-                    Some(q.pop())
-                } else {
-                    None
-                }
-            })
-        }
-    }
-
-    pub fn is_empty(&self) -> bool {
-        unsafe {
-            self.queue.with_imm(|q| q.is_empty() )
-        }
-    }
-}
-
-impl<T> Clone for WorkQueue<T> {
-    fn clone(&self) -> WorkQueue<T> {
-        WorkQueue {
-            queue: self.queue.clone()
-        }
-    }
-}
diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs
index 198fe596a89..153b3e4ce25 100644
--- a/src/libstd/task/spawn.rs
+++ b/src/libstd/task/spawn.rs
@@ -84,7 +84,6 @@ use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
 use rt::task::{Task, Sched};
 use rt::task::UnwindResult;
 use rt::thread::Thread;
-use rt::work_queue::WorkQueue;
 use rt::{in_green_task_context, new_event_loop};
 use task::SingleThreaded;
 use task::TaskOpts;
@@ -111,11 +110,11 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
             // Since this is a 1:1 scheduler we create a queue not in
             // the stealee set. The run_anything flag is set false
             // which will disable stealing.
-            let work_queue = WorkQueue::new();
+            let (worker, _stealer) = (*sched).work_queue.pool().deque();
 
             // Create a new scheduler to hold the new task
             let mut new_sched = ~Scheduler::new_special(new_event_loop(),
-                                                        work_queue,
+                                                        worker,
                                                         (*sched).work_queues.clone(),
                                                         (*sched).sleeper_list.clone(),
                                                         false,