diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-10-25 19:52:02 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-10-25 19:56:49 -0700 |
| commit | a849c476f5a62bdf5af546b603a5d7038fcb5e52 (patch) | |
| tree | 1d9d504d2cada34134e44c70027a0e375b7498c9 | |
| parent | 1ce5081f4d7a8d636f67204e0e62fe0e9164b560 (diff) | |
| download | rust-a849c476f5a62bdf5af546b603a5d7038fcb5e52.tar.gz rust-a849c476f5a62bdf5af546b603a5d7038fcb5e52.zip | |
Encapsulate the lock-free mpsc queue in the MessageQueue type
| -rw-r--r-- | src/libstd/rt/message_queue.rs | 60 | ||||
| -rw-r--r-- | src/libstd/rt/mpsc_queue.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 8 |
3 files changed, 15 insertions, 57 deletions
diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs index 99b5156b319..10e457368f0 100644 --- a/src/libstd/rt/message_queue.rs +++ b/src/libstd/rt/message_queue.rs @@ -11,83 +11,45 @@ //! A concurrent queue that supports multiple producers and a //! single consumer. -use container::Container; use kinds::Send; use vec::OwnedVector; -use cell::Cell; -use option::*; -use unstable::sync::{UnsafeArc, LittleLock}; +use option::Option; use clone::Clone; +use rt::mpsc_queue::Queue; pub struct MessageQueue<T> { - priv state: UnsafeArc<State<T>> -} - -struct State<T> { - count: uint, - queue: ~[T], - lock: LittleLock + priv queue: Queue<T> } impl<T: Send> MessageQueue<T> { pub fn new() -> MessageQueue<T> { MessageQueue { - state: UnsafeArc::new(State { - count: 0, - queue: ~[], - lock: LittleLock::new() - }) + queue: Queue::new() } } + #[inline] pub fn push(&mut self, value: T) { - unsafe { - let value = Cell::new(value); - let state = self.state.get(); - do (*state).lock.lock { - (*state).count += 1; - (*state).queue.push(value.take()); - } - } + self.queue.push(value) } + #[inline] pub fn pop(&mut self) -> Option<T> { - unsafe { - let state = self.state.get(); - do (*state).lock.lock { - if !(*state).queue.is_empty() { - (*state).count += 1; - Some((*state).queue.shift()) - } else { - None - } - } - } + self.queue.pop() } /// A pop that may sometimes miss enqueued elements, but is much faster /// to give up without doing any synchronization + #[inline] pub fn casual_pop(&mut self) -> Option<T> { - unsafe { - let state = self.state.get(); - // NB: Unsynchronized check - if (*state).count == 0 { return None; } - do (*state).lock.lock { - if !(*state).queue.is_empty() { - (*state).count += 1; - Some((*state).queue.shift()) - } else { - None - } - } - } + self.queue.pop() } } impl<T: Send> Clone for MessageQueue<T> { fn clone(&self) -> MessageQueue<T> { MessageQueue { - state: self.state.clone() + queue: self.queue.clone() } } } diff --git a/src/libstd/rt/mpsc_queue.rs b/src/libstd/rt/mpsc_queue.rs index 4ddd5e066cd..4f39a1df4fa 100644 --- a/src/libstd/rt/mpsc_queue.rs +++ b/src/libstd/rt/mpsc_queue.rs @@ -159,10 +159,6 @@ impl<T: Send> Queue<T> { unsafe { (*self.state.get()).push(value) } } - pub fn casual_pop(&mut self) -> Option<T> { - unsafe { (*self.state.get()).pop() } - } - pub fn pop(&mut self) -> Option<T> { unsafe{ (*self.state.get()).pop() } } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index e739eed32fe..b008a8a74f2 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -19,7 +19,7 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::mpsc_queue::Queue; +use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::local_ptr; use rt::local::Local; @@ -47,7 +47,7 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - priv message_queue: Queue<SchedMessage>, + priv message_queue: MessageQueue<SchedMessage>, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -137,7 +137,7 @@ impl Scheduler { let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: Queue::new(), + message_queue: MessageQueue::new(), sleepy: false, no_sleep: false, event_loop: event_loop, @@ -802,7 +802,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: Queue<SchedMessage>, + priv queue: MessageQueue<SchedMessage>, sched_id: uint } |
