about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-10-25 19:52:02 -0700
committerBrian Anderson <banderson@mozilla.com>2013-10-25 19:56:49 -0700
commita849c476f5a62bdf5af546b603a5d7038fcb5e52 (patch)
tree1d9d504d2cada34134e44c70027a0e375b7498c9
parent1ce5081f4d7a8d636f67204e0e62fe0e9164b560 (diff)
downloadrust-a849c476f5a62bdf5af546b603a5d7038fcb5e52.tar.gz
rust-a849c476f5a62bdf5af546b603a5d7038fcb5e52.zip
Encapsulate the lock-free mpsc queue in the MessageQueue type
-rw-r--r--src/libstd/rt/message_queue.rs60
-rw-r--r--src/libstd/rt/mpsc_queue.rs4
-rw-r--r--src/libstd/rt/sched.rs8
3 files changed, 15 insertions, 57 deletions
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs
index 99b5156b319..10e457368f0 100644
--- a/src/libstd/rt/message_queue.rs
+++ b/src/libstd/rt/message_queue.rs
@@ -11,83 +11,45 @@
 //! A concurrent queue that supports multiple producers and a
 //! single consumer.
 
-use container::Container;
 use kinds::Send;
 use vec::OwnedVector;
-use cell::Cell;
-use option::*;
-use unstable::sync::{UnsafeArc, LittleLock};
+use option::Option;
 use clone::Clone;
+use rt::mpsc_queue::Queue;
 
 pub struct MessageQueue<T> {
-    priv state: UnsafeArc<State<T>>
-}
-
-struct State<T> {
-    count: uint,
-    queue: ~[T],
-    lock: LittleLock
+    priv queue: Queue<T>
 }
 
 impl<T: Send> MessageQueue<T> {
     pub fn new() -> MessageQueue<T> {
         MessageQueue {
-            state: UnsafeArc::new(State {
-                count: 0,
-                queue: ~[],
-                lock: LittleLock::new()
-            })
+            queue: Queue::new()
         }
     }
 
+    #[inline]
     pub fn push(&mut self, value: T) {
-        unsafe {
-            let value = Cell::new(value);
-            let state = self.state.get();
-            do (*state).lock.lock {
-                (*state).count += 1;
-                (*state).queue.push(value.take());
-            }
-        }
+        self.queue.push(value)
     }
 
+    #[inline]
     pub fn pop(&mut self) -> Option<T> {
-        unsafe {
-            let state = self.state.get();
-            do (*state).lock.lock {
-                if !(*state).queue.is_empty() {
-                    (*state).count += 1;
-                    Some((*state).queue.shift())
-                } else {
-                    None
-                }
-            }
-        }
+        self.queue.pop()
     }
 
     /// A pop that may sometimes miss enqueued elements, but is much faster
     /// to give up without doing any synchronization
+    #[inline]
     pub fn casual_pop(&mut self) -> Option<T> {
-        unsafe {
-            let state = self.state.get();
-            // NB: Unsynchronized check
-            if (*state).count == 0 { return None; }
-            do (*state).lock.lock {
-                if !(*state).queue.is_empty() {
-                    (*state).count += 1;
-                    Some((*state).queue.shift())
-                } else {
-                    None
-                }
-            }
-        }
+        self.queue.pop()
     }
 }
 
 impl<T: Send> Clone for MessageQueue<T> {
     fn clone(&self) -> MessageQueue<T> {
         MessageQueue {
-            state: self.state.clone()
+            queue: self.queue.clone()
         }
     }
 }
diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs
index 4ddd5e066cd..4f39a1df4fa 100644
--- a/src/libstd/rt/mpsc_queue.rs
+++ b/src/libstd/rt/mpsc_queue.rs
@@ -159,10 +159,6 @@ impl<T: Send> Queue<T> {
         unsafe { (*self.state.get()).push(value) }
     }
 
-    pub fn casual_pop(&mut self) -> Option<T> {
-        unsafe { (*self.state.get()).pop() }
-    }
-
     pub fn pop(&mut self) -> Option<T> {
         unsafe{ (*self.state.get()).pop() }
     }
diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs
index e739eed32fe..b008a8a74f2 100644
--- a/src/libstd/rt/sched.rs
+++ b/src/libstd/rt/sched.rs
@@ -19,7 +19,7 @@ use super::stack::{StackPool};
 use super::rtio::EventLoop;
 use super::context::Context;
 use super::task::{Task, AnySched, Sched};
-use super::mpsc_queue::Queue;
+use super::message_queue::MessageQueue;
 use rt::kill::BlockedTask;
 use rt::local_ptr;
 use rt::local::Local;
@@ -47,7 +47,7 @@ pub struct Scheduler {
     /// The queue of incoming messages from other schedulers.
     /// These are enqueued by SchedHandles after which a remote callback
     /// is triggered to handle the message.
-    priv message_queue: Queue<SchedMessage>,
+    priv message_queue: MessageQueue<SchedMessage>,
     /// A shared list of sleeping schedulers. We'll use this to wake
     /// up schedulers when pushing work onto the work queue.
     sleeper_list: SleeperList,
@@ -137,7 +137,7 @@ impl Scheduler {
 
         let mut sched = Scheduler {
             sleeper_list: sleeper_list,
-            message_queue: Queue::new(),
+            message_queue: MessageQueue::new(),
             sleepy: false,
             no_sleep: false,
             event_loop: event_loop,
@@ -802,7 +802,7 @@ pub enum SchedMessage {
 
 pub struct SchedHandle {
     priv remote: ~RemoteCallback,
-    priv queue: Queue<SchedMessage>,
+    priv queue: MessageQueue<SchedMessage>,
     sched_id: uint
 }