about summary refs log tree commit diff
path: root/src/libstd/sys
diff options
context:
space:
mode:
authorAaron Turon <aturon@mozilla.com>2014-10-16 18:57:11 -0700
committerAaron Turon <aturon@mozilla.com>2014-11-08 20:40:39 -0800
commitb8f1193bb1bb66610f479cd78e3dc5526e93058d (patch)
tree4fbf269f6f0e31069f62ce6539c7d2ccfb7c0f35 /src/libstd/sys
parent0f98e75b69d16edce9ca60d7961b8440856a3f72 (diff)
downloadrust-b8f1193bb1bb66610f479cd78e3dc5526e93058d.tar.gz
rust-b8f1193bb1bb66610f479cd78e3dc5526e93058d.zip
Runtime removal: refactor timer
This patch continues runtime removal by moving out timer-related code
into `sys`.

Because this eliminates APIs in `libnative` and `librustrt`, it is a:

[breaking-change]

This functionality is likely to be available publicly, in some form,
from `std` in the future.
Diffstat (limited to 'src/libstd/sys')
-rw-r--r--src/libstd/sys/unix/mod.rs1
-rw-r--r--src/libstd/sys/unix/timer.rs282
-rw-r--r--src/libstd/sys/windows/mod.rs1
-rw-r--r--src/libstd/sys/windows/timer.rs208
4 files changed, 492 insertions, 0 deletions
diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs
index b404dc7fdbd..03a4e56f00d 100644
--- a/src/libstd/sys/unix/mod.rs
+++ b/src/libstd/sys/unix/mod.rs
@@ -34,6 +34,7 @@ pub mod udp;
 pub mod pipe;
 pub mod helper_signal;
 pub mod process;
+pub mod timer;
 
 pub mod addrinfo {
     pub use sys_common::net::get_host_addresses;
diff --git a/src/libstd/sys/unix/timer.rs b/src/libstd/sys/unix/timer.rs
new file mode 100644
index 00000000000..a1e6ac3db7e
--- /dev/null
+++ b/src/libstd/sys/unix/timer.rs
@@ -0,0 +1,282 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers for non-Linux/non-Windows OSes
+//!
+//! This module implements timers with a worker thread, select(), and a lot of
+//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
+//! part is that I'm at a loss of what else to do one these OSes. This is also
+//! why Linux has a specialized timerfd implementation and windows has its own
+//! implementation (they're more accurate than this one).
+//!
+//! The basic idea is that there is a worker thread that's communicated to via a
+//! channel and a pipe, the pipe is used by the worker thread in a select()
+//! syscall with a timeout. The timeout is the "next timer timeout" while the
+//! channel is used to send data over to the worker thread.
+//!
+//! Whenever the call to select() times out, then a channel receives a message.
+//! Whenever the call returns that the file descriptor has information, then the
+//! channel from timers is drained, enqueuing all incoming requests.
+//!
+//! The actual implementation of the helper thread is a sorted array of
+//! timers in terms of target firing date. The target is the absolute time at
+//! which the timer should fire. Timers are then re-enqueued after a firing if
+//! the repeat boolean is set.
+//!
+//! Naturally, all this logic of adding times and keeping track of
+//! relative/absolute time is a little lossy and not quite exact. I've done the
+//! best I could to reduce the amount of calls to 'now()', but there's likely
+//! still inaccuracies trickling in here and there.
+//!
+//! One of the tricky parts of this implementation is that whenever a timer is
+//! acted upon, it must cancel whatever the previous action was (if one is
+//! active) in order to act like the other implementations of this timer. In
+//! order to do this, the timer's inner pointer is transferred to the worker
+//! thread. Whenever the timer is modified, it first takes ownership back from
+//! the worker thread in order to modify the same data structure. This has the
+//! side effect of "cancelling" the previous requests while allowing a
+//! re-enqueuing later on.
+//!
+//! Note that all time units in this file are in *milliseconds*.
+
+use libc;
+use mem;
+use os;
+use ptr;
+use sync::atomic;
+use comm;
+use sys::c;
+use sys::fs::FileDesc;
+use sys_common::helper_thread::Helper;
+use prelude::*;
+use io::IoResult;
+
+helper_init!(static HELPER: Helper<Req>)
+
+pub trait Callback {
+    fn call(&mut self);
+}
+
+pub struct Timer {
+    id: uint,
+    inner: Option<Box<Inner>>,
+}
+
+pub struct Inner {
+    cb: Option<Box<Callback + Send>>,
+    interval: u64,
+    repeat: bool,
+    target: u64,
+    id: uint,
+}
+
+pub enum Req {
+    // Add a new timer to the helper thread.
+    NewTimer(Box<Inner>),
+
+    // Remove a timer based on its id and then send it back on the channel
+    // provided
+    RemoveTimer(uint, Sender<Box<Inner>>),
+}
+
+// returns the current time (in milliseconds)
+pub fn now() -> u64 {
+    unsafe {
+        let mut now: libc::timeval = mem::zeroed();
+        assert_eq!(c::gettimeofday(&mut now, ptr::null_mut()), 0);
+        return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
+    }
+}
+
+fn helper(input: libc::c_int, messages: Receiver<Req>, _: ()) {
+    let mut set: c::fd_set = unsafe { mem::zeroed() };
+
+    let mut fd = FileDesc::new(input, true);
+    let mut timeout: libc::timeval = unsafe { mem::zeroed() };
+
+    // active timers are those which are able to be selected upon (and it's a
+    // sorted list, and dead timers are those which have expired, but ownership
+    // hasn't yet been transferred back to the timer itself.
+    let mut active: Vec<Box<Inner>> = vec![];
+    let mut dead = vec![];
+
+    // inserts a timer into an array of timers (sorted by firing time)
+    fn insert(t: Box<Inner>, active: &mut Vec<Box<Inner>>) {
+        match active.iter().position(|tm| tm.target > t.target) {
+            Some(pos) => { active.insert(pos, t); }
+            None => { active.push(t); }
+        }
+    }
+
+    // signals the first requests in the queue, possible re-enqueueing it.
+    fn signal(active: &mut Vec<Box<Inner>>,
+              dead: &mut Vec<(uint, Box<Inner>)>) {
+        let mut timer = match active.remove(0) {
+            Some(timer) => timer, None => return
+        };
+        let mut cb = timer.cb.take().unwrap();
+        cb.call();
+        if timer.repeat {
+            timer.cb = Some(cb);
+            timer.target += timer.interval;
+            insert(timer, active);
+        } else {
+            dead.push((timer.id, timer));
+        }
+    }
+
+    'outer: loop {
+        let timeout = if active.len() == 0 {
+            // Empty array? no timeout (wait forever for the next request)
+            ptr::null_mut()
+        } else {
+            let now = now();
+            // If this request has already expired, then signal it and go
+            // through another iteration
+            if active[0].target <= now {
+                signal(&mut active, &mut dead);
+                continue;
+            }
+
+            // The actual timeout listed in the requests array is an
+            // absolute date, so here we translate the absolute time to a
+            // relative time.
+            let tm = active[0].target - now;
+            timeout.tv_sec = (tm / 1000) as libc::time_t;
+            timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
+            &mut timeout as *mut libc::timeval
+        };
+
+        c::fd_set(&mut set, input);
+        match unsafe {
+            c::select(input + 1, &mut set, ptr::null_mut(),
+                      ptr::null_mut(), timeout)
+        } {
+            // timed out
+            0 => signal(&mut active, &mut dead),
+
+            // file descriptor write woke us up, we've got some new requests
+            1 => {
+                loop {
+                    match messages.try_recv() {
+                        Err(comm::Disconnected) => {
+                            assert!(active.len() == 0);
+                            break 'outer;
+                        }
+
+                        Ok(NewTimer(timer)) => insert(timer, &mut active),
+
+                        Ok(RemoveTimer(id, ack)) => {
+                            match dead.iter().position(|&(i, _)| id == i) {
+                                Some(i) => {
+                                    let (_, i) = dead.remove(i).unwrap();
+                                    ack.send(i);
+                                    continue
+                                }
+                                None => {}
+                            }
+                            let i = active.iter().position(|i| i.id == id);
+                            let i = i.expect("no timer found");
+                            let t = active.remove(i).unwrap();
+                            ack.send(t);
+                        }
+                        Err(..) => break
+                    }
+                }
+
+                // drain the file descriptor
+                let mut buf = [0];
+                assert_eq!(fd.read(buf).ok().unwrap(), 1);
+            }
+
+            -1 if os::errno() == libc::EINTR as uint => {}
+            n => panic!("helper thread failed in select() with error: {} ({})",
+                       n, os::last_os_error())
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        // See notes above regarding using int return value
+        // instead of ()
+        HELPER.boot(|| {}, helper);
+
+        static ID: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT;
+        let id = ID.fetch_add(1, atomic::Relaxed);
+        Ok(Timer {
+            id: id,
+            inner: Some(box Inner {
+                cb: None,
+                interval: 0,
+                target: 0,
+                repeat: false,
+                id: id,
+            })
+        })
+    }
+
+    pub fn sleep(&mut self, ms: u64) {
+        let mut inner = self.inner();
+        inner.cb = None; // cancel any previous request
+        self.inner = Some(inner);
+
+        let mut to_sleep = libc::timespec {
+            tv_sec: (ms / 1000) as libc::time_t,
+            tv_nsec: ((ms % 1000) * 1000000) as libc::c_long,
+        };
+        while unsafe { libc::nanosleep(&to_sleep, &mut to_sleep) } != 0 {
+            if os::errno() as int != libc::EINTR as int {
+                panic!("failed to sleep, but not because of EINTR?");
+            }
+        }
+    }
+
+    pub fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>) {
+        let now = now();
+        let mut inner = self.inner();
+
+        inner.repeat = false;
+        inner.cb = Some(cb);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        HELPER.send(NewTimer(inner));
+    }
+
+    pub fn period(&mut self, msecs: u64, cb: Box<Callback + Send>) {
+        let now = now();
+        let mut inner = self.inner();
+
+        inner.repeat = true;
+        inner.cb = Some(cb);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        HELPER.send(NewTimer(inner));
+    }
+
+    fn inner(&mut self) -> Box<Inner> {
+        match self.inner.take() {
+            Some(i) => i,
+            None => {
+                let (tx, rx) = channel();
+                HELPER.send(RemoveTimer(self.id, tx));
+                rx.recv()
+            }
+        }
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.inner = Some(self.inner());
+    }
+}
diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs
index f50244701e4..0dc06de33e0 100644
--- a/src/libstd/sys/windows/mod.rs
+++ b/src/libstd/sys/windows/mod.rs
@@ -41,6 +41,7 @@ pub mod udp;
 pub mod pipe;
 pub mod helper_signal;
 pub mod process;
+pub mod timer;
 
 pub mod addrinfo {
     pub use sys_common::net::get_host_addresses;
diff --git a/src/libstd/sys/windows/timer.rs b/src/libstd/sys/windows/timer.rs
new file mode 100644
index 00000000000..f507be2a985
--- /dev/null
+++ b/src/libstd/sys/windows/timer.rs
@@ -0,0 +1,208 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers based on Windows WaitableTimers
+//!
+//! This implementation is meant to be used solely on windows. As with other
+//! implementations, there is a worker thread which is doing all the waiting on
+//! a large number of timers for all active timers in the system. This worker
+//! thread uses the select() equivalent, WaitForMultipleObjects. One of the
+//! objects being waited on is a signal into the worker thread to notify that
+//! the incoming channel should be looked at.
+//!
+//! Other than that, the implementation is pretty straightforward in terms of
+//! the other two implementations of timers with nothing *that* new showing up.
+
+use libc;
+use ptr;
+use comm;
+
+use sys::c;
+use sys::fs::FileDesc;
+use sys_common::helper_thread::Helper;
+use prelude::*;
+use io::IoResult;
+
+helper_init!(static HELPER: Helper<Req>)
+
+pub trait Callback {
+    fn call(&mut self);
+}
+
+pub struct Timer {
+    obj: libc::HANDLE,
+    on_worker: bool,
+}
+
+pub enum Req {
+    NewTimer(libc::HANDLE, Box<Callback + Send>, bool),
+    RemoveTimer(libc::HANDLE, Sender<()>),
+}
+
+fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) {
+    let mut objs = vec![input];
+    let mut chans = vec![];
+
+    'outer: loop {
+        let idx = unsafe {
+            imp::WaitForMultipleObjects(objs.len() as libc::DWORD,
+                                        objs.as_ptr(),
+                                        0 as libc::BOOL,
+                                        libc::INFINITE)
+        };
+
+        if idx == 0 {
+            loop {
+                match messages.try_recv() {
+                    Ok(NewTimer(obj, c, one)) => {
+                        objs.push(obj);
+                        chans.push((c, one));
+                    }
+                    Ok(RemoveTimer(obj, c)) => {
+                        c.send(());
+                        match objs.iter().position(|&o| o == obj) {
+                            Some(i) => {
+                                drop(objs.remove(i));
+                                drop(chans.remove(i - 1));
+                            }
+                            None => {}
+                        }
+                    }
+                    Err(comm::Disconnected) => {
+                        assert_eq!(objs.len(), 1);
+                        assert_eq!(chans.len(), 0);
+                        break 'outer;
+                    }
+                    Err(..) => break
+                }
+            }
+        } else {
+            let remove = {
+                match &mut chans[idx as uint - 1] {
+                    &(ref mut c, oneshot) => { c.call(); oneshot }
+                }
+            };
+            if remove {
+                drop(objs.remove(idx as uint));
+                drop(chans.remove(idx as uint - 1));
+            }
+        }
+    }
+}
+
+// returns the current time (in milliseconds)
+pub fn now() -> u64 {
+    let mut ticks_per_s = 0;
+    assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1);
+    let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
+    let mut ticks = 0;
+    assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1);
+
+    return (ticks as u64 * 1000) / (ticks_per_s as u64);
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        HELPER.boot(|| {}, helper);
+
+        let obj = unsafe {
+            imp::CreateWaitableTimerA(ptr::null_mut(), 0, ptr::null())
+        };
+        if obj.is_null() {
+            Err(super::last_error())
+        } else {
+            Ok(Timer { obj: obj, on_worker: false, })
+        }
+    }
+
+    fn remove(&mut self) {
+        if !self.on_worker { return }
+
+        let (tx, rx) = channel();
+        HELPER.send(RemoveTimer(self.obj, tx));
+        rx.recv();
+
+        self.on_worker = false;
+    }
+
+    pub fn sleep(&mut self, msecs: u64) {
+        self.remove();
+
+        // there are 10^6 nanoseconds in a millisecond, and the parameter is in
+        // 100ns intervals, so we multiply by 10^4.
+        let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null_mut(),
+                                  ptr::null_mut(), 0)
+        }, 1);
+
+        let _ = unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE) };
+    }
+
+    pub fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>) {
+        self.remove();
+
+        // see above for the calculation
+        let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null_mut(),
+                                  ptr::null_mut(), 0)
+        }, 1);
+
+        HELPER.send(NewTimer(self.obj, cb, true));
+        self.on_worker = true;
+    }
+
+    pub fn period(&mut self, msecs: u64, cb: Box<Callback + Send>) {
+        self.remove();
+
+        // see above for the calculation
+        let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG,
+                                  ptr::null_mut(), ptr::null_mut(), 0)
+        }, 1);
+
+        HELPER.send(NewTimer(self.obj, cb, false));
+        self.on_worker = true;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.remove();
+        assert!(unsafe { libc::CloseHandle(self.obj) != 0 });
+    }
+}
+
+mod imp {
+    use libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER,
+                    LONG, LPVOID, DWORD, c_void};
+
+    pub type PTIMERAPCROUTINE = *mut c_void;
+
+    extern "system" {
+        pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES,
+                                    bManualReset: BOOL,
+                                    lpTimerName: LPCSTR) -> HANDLE;
+        pub fn SetWaitableTimer(hTimer: HANDLE,
+                                pDueTime: *const LARGE_INTEGER,
+                                lPeriod: LONG,
+                                pfnCompletionRoutine: PTIMERAPCROUTINE,
+                                lpArgToCompletionRoutine: LPVOID,
+                                fResume: BOOL) -> BOOL;
+        pub fn WaitForMultipleObjects(nCount: DWORD,
+                                      lpHandles: *const HANDLE,
+                                      bWaitAll: BOOL,
+                                      dwMilliseconds: DWORD) -> DWORD;
+        pub fn WaitForSingleObject(hHandle: HANDLE,
+                                   dwMilliseconds: DWORD) -> DWORD;
+    }
+}