1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! A concurrent queue used to signal remote event loops
//!
//! This queue implementation is used to send tasks among event loops. This is
//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
//! handles (to wake up a remote event loop).
//!
//! The uv_async_t is stored next to the event loop, so in order to not keep the
//! event loop alive we use uv_ref and uv_unref in order to control when the
//! async handle is active or not.
#![allow(dead_code)]
use std::cast;
use std::libc::{c_void, c_int};
use std::rt::task::BlockedTask;
use std::unstable::mutex::NativeMutex;
use std::sync::arc::UnsafeArc;
use mpsc = std::sync::mpsc_queue;
use async::AsyncWatcher;
use super::{Loop, UvHandle};
use uvll;
enum Message {
Task(BlockedTask),
Increment,
Decrement,
}
struct State {
handle: *uvll::uv_async_t,
lock: NativeMutex, // see comments in async_cb for why this is needed
queue: mpsc::Queue<Message>,
}
/// This structure is intended to be stored next to the event loop, and it is
/// used to create new `Queue` structures.
pub struct QueuePool {
priv queue: UnsafeArc<State>,
priv refcnt: uint,
}
/// This type is used to send messages back to the original event loop.
pub struct Queue {
priv queue: UnsafeArc<State>,
}
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
assert_eq!(status, 0);
let pool: &mut QueuePool = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
let state: &mut State = unsafe { cast::transmute(pool.queue.get()) };
// Remember that there is no guarantee about how many times an async
// callback is called with relation to the number of sends, so process the
// entire queue in a loop.
loop {
match state.queue.pop() {
mpsc::Data(Task(task)) => {
let _ = task.wake().map(|t| t.reawaken());
}
mpsc::Data(Increment) => unsafe {
if pool.refcnt == 0 {
uvll::uv_ref(state.handle);
}
pool.refcnt += 1;
},
mpsc::Data(Decrement) => unsafe {
pool.refcnt -= 1;
if pool.refcnt == 0 {
uvll::uv_unref(state.handle);
}
},
mpsc::Empty | mpsc::Inconsistent => break
};
}
// If the refcount is now zero after processing the queue, then there is no
// longer a reference on the async handle and it is possible that this event
// loop can exit. What we're not guaranteed, however, is that a producer in
// the middle of dropping itself is yet done with the handle. It could be
// possible that we saw their Decrement message but they have yet to signal
// on the async handle. If we were to return immediately, the entire uv loop
// could be destroyed meaning the call to uv_async_send would abort()
//
// In order to fix this, an OS mutex is used to wait for the other end to
// finish before we continue. The drop block on a handle will acquire a
// mutex and then drop it after both the push and send have been completed.
// If we acquire the mutex here, then we are guaranteed that there are no
// longer any senders which are holding on to their handles, so we can
// safely allow the event loop to exit.
if pool.refcnt == 0 {
unsafe {
let _l = state.lock.lock();
}
}
}
impl QueuePool {
pub fn new(loop_: &mut Loop) -> ~QueuePool {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let state = UnsafeArc::new(State {
handle: handle,
lock: unsafe {NativeMutex::new()},
queue: mpsc::Queue::new(),
});
let q = ~QueuePool {
refcnt: 0,
queue: state,
};
unsafe {
assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
uvll::uv_unref(handle);
let data = &*q as *QueuePool as *c_void;
uvll::set_data_for_uv_handle(handle, data);
}
return q;
}
pub fn queue(&mut self) -> Queue {
unsafe {
if self.refcnt == 0 {
uvll::uv_ref((*self.queue.get()).handle);
}
self.refcnt += 1;
}
Queue { queue: self.queue.clone() }
}
pub fn handle(&self) -> *uvll::uv_async_t {
unsafe { (*self.queue.get()).handle }
}
}
impl Queue {
pub fn push(&mut self, task: BlockedTask) {
unsafe {
(*self.queue.get()).queue.push(Task(task));
uvll::uv_async_send((*self.queue.get()).handle);
}
}
}
impl Clone for Queue {
fn clone(&self) -> Queue {
// Push a request to increment on the queue, but there's no need to
// signal the event loop to process it at this time. We're guaranteed
// that the count is at least one (because we have a queue right here),
// and if the queue is dropped later on it'll see the increment for the
// decrement anyway.
unsafe {
(*self.queue.get()).queue.push(Increment);
}
Queue { queue: self.queue.clone() }
}
}
impl Drop for Queue {
fn drop(&mut self) {
// See the comments in the async_cb function for why there is a lock
// that is acquired only on a drop.
unsafe {
let state = self.queue.get();
let _l = (*state).lock.lock();
(*state).queue.push(Decrement);
uvll::uv_async_send((*state).handle);
}
}
}
impl Drop for State {
fn drop(&mut self) {
unsafe {
uvll::uv_close(self.handle, cast::transmute(0));
// Note that this does *not* free the handle, that is the
// responsibility of the caller because the uv loop must be closed
// before we deallocate this uv handle.
}
}
}
|