about summary refs log tree commit diff
path: root/src/librustuv/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/librustuv/queue.rs')
-rw-r--r--src/librustuv/queue.rs185
1 files changed, 0 insertions, 185 deletions
diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs
deleted file mode 100644
index 6a0f2a27b18..00000000000
--- a/src/librustuv/queue.rs
+++ /dev/null
@@ -1,185 +0,0 @@
-// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! A concurrent queue used to signal remote event loops
-//!
-//! This queue implementation is used to send tasks among event loops. This is
-//! backed by a multi-producer/single-consumer queue from libstd and uv_async_t
-//! handles (to wake up a remote event loop).
-//!
-//! The uv_async_t is stored next to the event loop, so in order to not keep the
-//! event loop alive we use uv_ref and uv_unref in order to control when the
-//! async handle is active or not.
-
-#![allow(dead_code)]
-
-use alloc::arc::Arc;
-use libc::c_void;
-use std::mem;
-use std::rt::mutex::NativeMutex;
-use std::rt::task::BlockedTask;
-use std::sync::mpsc_queue as mpsc;
-
-use async::AsyncWatcher;
-use super::{Loop, UvHandle};
-use uvll;
-
-enum Message {
-    Task(BlockedTask),
-    Increment,
-    Decrement,
-}
-
-struct State {
-    handle: *mut uvll::uv_async_t,
-    lock: NativeMutex, // see comments in async_cb for why this is needed
-    queue: mpsc::Queue<Message>,
-}
-
-/// This structure is intended to be stored next to the event loop, and it is
-/// used to create new `Queue` structures.
-pub struct QueuePool {
-    queue: Arc<State>,
-    refcnt: uint,
-}
-
-/// This type is used to send messages back to the original event loop.
-pub struct Queue {
-    queue: Arc<State>,
-}
-
-extern fn async_cb(handle: *mut uvll::uv_async_t) {
-    let pool: &mut QueuePool = unsafe {
-        mem::transmute(uvll::get_data_for_uv_handle(handle))
-    };
-    let state: &State = &*pool.queue;
-
-    // Remember that there is no guarantee about how many times an async
-    // callback is called with relation to the number of sends, so process the
-    // entire queue in a loop.
-    loop {
-        match state.queue.pop() {
-            mpsc::Data(Task(task)) => {
-                let _ = task.wake().map(|t| t.reawaken());
-            }
-            mpsc::Data(Increment) => unsafe {
-                if pool.refcnt == 0 {
-                    uvll::uv_ref(state.handle);
-                }
-                pool.refcnt += 1;
-            },
-            mpsc::Data(Decrement) => unsafe {
-                pool.refcnt -= 1;
-                if pool.refcnt == 0 {
-                    uvll::uv_unref(state.handle);
-                }
-            },
-            mpsc::Empty | mpsc::Inconsistent => break
-        };
-    }
-
-    // If the refcount is now zero after processing the queue, then there is no
-    // longer a reference on the async handle and it is possible that this event
-    // loop can exit. What we're not guaranteed, however, is that a producer in
-    // the middle of dropping itself is yet done with the handle. It could be
-    // possible that we saw their Decrement message but they have yet to signal
-    // on the async handle. If we were to return immediately, the entire uv loop
-    // could be destroyed meaning the call to uv_async_send would abort()
-    //
-    // In order to fix this, an OS mutex is used to wait for the other end to
-    // finish before we continue. The drop block on a handle will acquire a
-    // mutex and then drop it after both the push and send have been completed.
-    // If we acquire the mutex here, then we are guaranteed that there are no
-    // longer any senders which are holding on to their handles, so we can
-    // safely allow the event loop to exit.
-    if pool.refcnt == 0 {
-        unsafe {
-            let _l = state.lock.lock();
-        }
-    }
-}
-
-impl QueuePool {
-    pub fn new(loop_: &mut Loop) -> Box<QueuePool> {
-        let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
-        let state = Arc::new(State {
-            handle: handle,
-            lock: unsafe {NativeMutex::new()},
-            queue: mpsc::Queue::new(),
-        });
-        let mut q = box QueuePool {
-            refcnt: 0,
-            queue: state,
-        };
-
-        unsafe {
-            assert_eq!(uvll::uv_async_init(loop_.handle, handle, async_cb), 0);
-            uvll::uv_unref(handle);
-            let data = &mut *q as *mut QueuePool as *mut c_void;
-            uvll::set_data_for_uv_handle(handle, data);
-        }
-
-        return q;
-    }
-
-    pub fn queue(&mut self) -> Queue {
-        unsafe {
-            if self.refcnt == 0 {
-                uvll::uv_ref(self.queue.handle);
-            }
-            self.refcnt += 1;
-        }
-        Queue { queue: self.queue.clone() }
-    }
-
-    pub fn handle(&self) -> *mut uvll::uv_async_t { self.queue.handle }
-}
-
-impl Queue {
-    pub fn push(&mut self, task: BlockedTask) {
-        self.queue.queue.push(Task(task));
-        unsafe { uvll::uv_async_send(self.queue.handle); }
-    }
-}
-
-impl Clone for Queue {
-    fn clone(&self) -> Queue {
-        // Push a request to increment on the queue, but there's no need to
-        // signal the event loop to process it at this time. We're guaranteed
-        // that the count is at least one (because we have a queue right here),
-        // and if the queue is dropped later on it'll see the increment for the
-        // decrement anyway.
-        self.queue.queue.push(Increment);
-        Queue { queue: self.queue.clone() }
-    }
-}
-
-impl Drop for Queue {
-    fn drop(&mut self) {
-        // See the comments in the async_cb function for why there is a lock
-        // that is acquired only on a drop.
-        unsafe {
-            let _l = self.queue.lock.lock();
-            self.queue.queue.push(Decrement);
-            uvll::uv_async_send(self.queue.handle);
-        }
-    }
-}
-
-impl Drop for State {
-    fn drop(&mut self) {
-        unsafe {
-            uvll::uv_close(self.handle, mem::transmute(0u));
-            // Note that this does *not* free the handle, that is the
-            // responsibility of the caller because the uv loop must be closed
-            // before we deallocate this uv handle.
-        }
-    }
-}