diff options
| author | Aaron Turon <aturon@mozilla.com> | 2014-10-16 18:57:11 -0700 |
|---|---|---|
| committer | Aaron Turon <aturon@mozilla.com> | 2014-11-08 20:40:39 -0800 |
| commit | b8f1193bb1bb66610f479cd78e3dc5526e93058d (patch) | |
| tree | 4fbf269f6f0e31069f62ce6539c7d2ccfb7c0f35 /src/libstd/sys | |
| parent | 0f98e75b69d16edce9ca60d7961b8440856a3f72 (diff) | |
| download | rust-b8f1193bb1bb66610f479cd78e3dc5526e93058d.tar.gz rust-b8f1193bb1bb66610f479cd78e3dc5526e93058d.zip | |
Runtime removal: refactor timer
This patch continues runtime removal by moving out timer-related code into `sys`. Because this eliminates APIs in `libnative` and `librustrt`, it is a: [breaking-change] This functionality is likely to be available publicly, in some form, from `std` in the future.
Diffstat (limited to 'src/libstd/sys')
| -rw-r--r-- | src/libstd/sys/unix/mod.rs | 1 | ||||
| -rw-r--r-- | src/libstd/sys/unix/timer.rs | 282 | ||||
| -rw-r--r-- | src/libstd/sys/windows/mod.rs | 1 | ||||
| -rw-r--r-- | src/libstd/sys/windows/timer.rs | 208 |
4 files changed, 492 insertions, 0 deletions
diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs index b404dc7fdbd..03a4e56f00d 100644 --- a/src/libstd/sys/unix/mod.rs +++ b/src/libstd/sys/unix/mod.rs @@ -34,6 +34,7 @@ pub mod udp; pub mod pipe; pub mod helper_signal; pub mod process; +pub mod timer; pub mod addrinfo { pub use sys_common::net::get_host_addresses; diff --git a/src/libstd/sys/unix/timer.rs b/src/libstd/sys/unix/timer.rs new file mode 100644 index 00000000000..a1e6ac3db7e --- /dev/null +++ b/src/libstd/sys/unix/timer.rs @@ -0,0 +1,282 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Timers for non-Linux/non-Windows OSes +//! +//! This module implements timers with a worker thread, select(), and a lot of +//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate +//! part is that I'm at a loss of what else to do one these OSes. This is also +//! why Linux has a specialized timerfd implementation and windows has its own +//! implementation (they're more accurate than this one). +//! +//! The basic idea is that there is a worker thread that's communicated to via a +//! channel and a pipe, the pipe is used by the worker thread in a select() +//! syscall with a timeout. The timeout is the "next timer timeout" while the +//! channel is used to send data over to the worker thread. +//! +//! Whenever the call to select() times out, then a channel receives a message. +//! Whenever the call returns that the file descriptor has information, then the +//! channel from timers is drained, enqueuing all incoming requests. +//! +//! The actual implementation of the helper thread is a sorted array of +//! timers in terms of target firing date. The target is the absolute time at +//! which the timer should fire. Timers are then re-enqueued after a firing if +//! the repeat boolean is set. +//! +//! Naturally, all this logic of adding times and keeping track of +//! relative/absolute time is a little lossy and not quite exact. I've done the +//! best I could to reduce the amount of calls to 'now()', but there's likely +//! still inaccuracies trickling in here and there. +//! +//! One of the tricky parts of this implementation is that whenever a timer is +//! acted upon, it must cancel whatever the previous action was (if one is +//! active) in order to act like the other implementations of this timer. In +//! order to do this, the timer's inner pointer is transferred to the worker +//! thread. Whenever the timer is modified, it first takes ownership back from +//! the worker thread in order to modify the same data structure. This has the +//! side effect of "cancelling" the previous requests while allowing a +//! re-enqueuing later on. +//! +//! Note that all time units in this file are in *milliseconds*. + +use libc; +use mem; +use os; +use ptr; +use sync::atomic; +use comm; +use sys::c; +use sys::fs::FileDesc; +use sys_common::helper_thread::Helper; +use prelude::*; +use io::IoResult; + +helper_init!(static HELPER: Helper<Req>) + +pub trait Callback { + fn call(&mut self); +} + +pub struct Timer { + id: uint, + inner: Option<Box<Inner>>, +} + +pub struct Inner { + cb: Option<Box<Callback + Send>>, + interval: u64, + repeat: bool, + target: u64, + id: uint, +} + +pub enum Req { + // Add a new timer to the helper thread. + NewTimer(Box<Inner>), + + // Remove a timer based on its id and then send it back on the channel + // provided + RemoveTimer(uint, Sender<Box<Inner>>), +} + +// returns the current time (in milliseconds) +pub fn now() -> u64 { + unsafe { + let mut now: libc::timeval = mem::zeroed(); + assert_eq!(c::gettimeofday(&mut now, ptr::null_mut()), 0); + return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000; + } +} + +fn helper(input: libc::c_int, messages: Receiver<Req>, _: ()) { + let mut set: c::fd_set = unsafe { mem::zeroed() }; + + let mut fd = FileDesc::new(input, true); + let mut timeout: libc::timeval = unsafe { mem::zeroed() }; + + // active timers are those which are able to be selected upon (and it's a + // sorted list, and dead timers are those which have expired, but ownership + // hasn't yet been transferred back to the timer itself. + let mut active: Vec<Box<Inner>> = vec![]; + let mut dead = vec![]; + + // inserts a timer into an array of timers (sorted by firing time) + fn insert(t: Box<Inner>, active: &mut Vec<Box<Inner>>) { + match active.iter().position(|tm| tm.target > t.target) { + Some(pos) => { active.insert(pos, t); } + None => { active.push(t); } + } + } + + // signals the first requests in the queue, possible re-enqueueing it. + fn signal(active: &mut Vec<Box<Inner>>, + dead: &mut Vec<(uint, Box<Inner>)>) { + let mut timer = match active.remove(0) { + Some(timer) => timer, None => return + }; + let mut cb = timer.cb.take().unwrap(); + cb.call(); + if timer.repeat { + timer.cb = Some(cb); + timer.target += timer.interval; + insert(timer, active); + } else { + dead.push((timer.id, timer)); + } + } + + 'outer: loop { + let timeout = if active.len() == 0 { + // Empty array? no timeout (wait forever for the next request) + ptr::null_mut() + } else { + let now = now(); + // If this request has already expired, then signal it and go + // through another iteration + if active[0].target <= now { + signal(&mut active, &mut dead); + continue; + } + + // The actual timeout listed in the requests array is an + // absolute date, so here we translate the absolute time to a + // relative time. + let tm = active[0].target - now; + timeout.tv_sec = (tm / 1000) as libc::time_t; + timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t; + &mut timeout as *mut libc::timeval + }; + + c::fd_set(&mut set, input); + match unsafe { + c::select(input + 1, &mut set, ptr::null_mut(), + ptr::null_mut(), timeout) + } { + // timed out + 0 => signal(&mut active, &mut dead), + + // file descriptor write woke us up, we've got some new requests + 1 => { + loop { + match messages.try_recv() { + Err(comm::Disconnected) => { + assert!(active.len() == 0); + break 'outer; + } + + Ok(NewTimer(timer)) => insert(timer, &mut active), + + Ok(RemoveTimer(id, ack)) => { + match dead.iter().position(|&(i, _)| id == i) { + Some(i) => { + let (_, i) = dead.remove(i).unwrap(); + ack.send(i); + continue + } + None => {} + } + let i = active.iter().position(|i| i.id == id); + let i = i.expect("no timer found"); + let t = active.remove(i).unwrap(); + ack.send(t); + } + Err(..) => break + } + } + + // drain the file descriptor + let mut buf = [0]; + assert_eq!(fd.read(buf).ok().unwrap(), 1); + } + + -1 if os::errno() == libc::EINTR as uint => {} + n => panic!("helper thread failed in select() with error: {} ({})", + n, os::last_os_error()) + } + } +} + +impl Timer { + pub fn new() -> IoResult<Timer> { + // See notes above regarding using int return value + // instead of () + HELPER.boot(|| {}, helper); + + static ID: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; + let id = ID.fetch_add(1, atomic::Relaxed); + Ok(Timer { + id: id, + inner: Some(box Inner { + cb: None, + interval: 0, + target: 0, + repeat: false, + id: id, + }) + }) + } + + pub fn sleep(&mut self, ms: u64) { + let mut inner = self.inner(); + inner.cb = None; // cancel any previous request + self.inner = Some(inner); + + let mut to_sleep = libc::timespec { + tv_sec: (ms / 1000) as libc::time_t, + tv_nsec: ((ms % 1000) * 1000000) as libc::c_long, + }; + while unsafe { libc::nanosleep(&to_sleep, &mut to_sleep) } != 0 { + if os::errno() as int != libc::EINTR as int { + panic!("failed to sleep, but not because of EINTR?"); + } + } + } + + pub fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>) { + let now = now(); + let mut inner = self.inner(); + + inner.repeat = false; + inner.cb = Some(cb); + inner.interval = msecs; + inner.target = now + msecs; + + HELPER.send(NewTimer(inner)); + } + + pub fn period(&mut self, msecs: u64, cb: Box<Callback + Send>) { + let now = now(); + let mut inner = self.inner(); + + inner.repeat = true; + inner.cb = Some(cb); + inner.interval = msecs; + inner.target = now + msecs; + + HELPER.send(NewTimer(inner)); + } + + fn inner(&mut self) -> Box<Inner> { + match self.inner.take() { + Some(i) => i, + None => { + let (tx, rx) = channel(); + HELPER.send(RemoveTimer(self.id, tx)); + rx.recv() + } + } + } +} + +impl Drop for Timer { + fn drop(&mut self) { + self.inner = Some(self.inner()); + } +} diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs index f50244701e4..0dc06de33e0 100644 --- a/src/libstd/sys/windows/mod.rs +++ b/src/libstd/sys/windows/mod.rs @@ -41,6 +41,7 @@ pub mod udp; pub mod pipe; pub mod helper_signal; pub mod process; +pub mod timer; pub mod addrinfo { pub use sys_common::net::get_host_addresses; diff --git a/src/libstd/sys/windows/timer.rs b/src/libstd/sys/windows/timer.rs new file mode 100644 index 00000000000..f507be2a985 --- /dev/null +++ b/src/libstd/sys/windows/timer.rs @@ -0,0 +1,208 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Timers based on Windows WaitableTimers +//! +//! This implementation is meant to be used solely on windows. As with other +//! implementations, there is a worker thread which is doing all the waiting on +//! a large number of timers for all active timers in the system. This worker +//! thread uses the select() equivalent, WaitForMultipleObjects. One of the +//! objects being waited on is a signal into the worker thread to notify that +//! the incoming channel should be looked at. +//! +//! Other than that, the implementation is pretty straightforward in terms of +//! the other two implementations of timers with nothing *that* new showing up. + +use libc; +use ptr; +use comm; + +use sys::c; +use sys::fs::FileDesc; +use sys_common::helper_thread::Helper; +use prelude::*; +use io::IoResult; + +helper_init!(static HELPER: Helper<Req>) + +pub trait Callback { + fn call(&mut self); +} + +pub struct Timer { + obj: libc::HANDLE, + on_worker: bool, +} + +pub enum Req { + NewTimer(libc::HANDLE, Box<Callback + Send>, bool), + RemoveTimer(libc::HANDLE, Sender<()>), +} + +fn helper(input: libc::HANDLE, messages: Receiver<Req>, _: ()) { + let mut objs = vec![input]; + let mut chans = vec![]; + + 'outer: loop { + let idx = unsafe { + imp::WaitForMultipleObjects(objs.len() as libc::DWORD, + objs.as_ptr(), + 0 as libc::BOOL, + libc::INFINITE) + }; + + if idx == 0 { + loop { + match messages.try_recv() { + Ok(NewTimer(obj, c, one)) => { + objs.push(obj); + chans.push((c, one)); + } + Ok(RemoveTimer(obj, c)) => { + c.send(()); + match objs.iter().position(|&o| o == obj) { + Some(i) => { + drop(objs.remove(i)); + drop(chans.remove(i - 1)); + } + None => {} + } + } + Err(comm::Disconnected) => { + assert_eq!(objs.len(), 1); + assert_eq!(chans.len(), 0); + break 'outer; + } + Err(..) => break + } + } + } else { + let remove = { + match &mut chans[idx as uint - 1] { + &(ref mut c, oneshot) => { c.call(); oneshot } + } + }; + if remove { + drop(objs.remove(idx as uint)); + drop(chans.remove(idx as uint - 1)); + } + } + } +} + +// returns the current time (in milliseconds) +pub fn now() -> u64 { + let mut ticks_per_s = 0; + assert_eq!(unsafe { libc::QueryPerformanceFrequency(&mut ticks_per_s) }, 1); + let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s}; + let mut ticks = 0; + assert_eq!(unsafe { libc::QueryPerformanceCounter(&mut ticks) }, 1); + + return (ticks as u64 * 1000) / (ticks_per_s as u64); +} + +impl Timer { + pub fn new() -> IoResult<Timer> { + HELPER.boot(|| {}, helper); + + let obj = unsafe { + imp::CreateWaitableTimerA(ptr::null_mut(), 0, ptr::null()) + }; + if obj.is_null() { + Err(super::last_error()) + } else { + Ok(Timer { obj: obj, on_worker: false, }) + } + } + + fn remove(&mut self) { + if !self.on_worker { return } + + let (tx, rx) = channel(); + HELPER.send(RemoveTimer(self.obj, tx)); + rx.recv(); + + self.on_worker = false; + } + + pub fn sleep(&mut self, msecs: u64) { + self.remove(); + + // there are 10^6 nanoseconds in a millisecond, and the parameter is in + // 100ns intervals, so we multiply by 10^4. + let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, 0, ptr::null_mut(), + ptr::null_mut(), 0) + }, 1); + + let _ = unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE) }; + } + + pub fn oneshot(&mut self, msecs: u64, cb: Box<Callback + Send>) { + self.remove(); + + // see above for the calculation + let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, 0, ptr::null_mut(), + ptr::null_mut(), 0) + }, 1); + + HELPER.send(NewTimer(self.obj, cb, true)); + self.on_worker = true; + } + + pub fn period(&mut self, msecs: u64, cb: Box<Callback + Send>) { + self.remove(); + + // see above for the calculation + let due = -(msecs as i64 * 10000) as libc::LARGE_INTEGER; + assert_eq!(unsafe { + imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG, + ptr::null_mut(), ptr::null_mut(), 0) + }, 1); + + HELPER.send(NewTimer(self.obj, cb, false)); + self.on_worker = true; + } +} + +impl Drop for Timer { + fn drop(&mut self) { + self.remove(); + assert!(unsafe { libc::CloseHandle(self.obj) != 0 }); + } +} + +mod imp { + use libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER, + LONG, LPVOID, DWORD, c_void}; + + pub type PTIMERAPCROUTINE = *mut c_void; + + extern "system" { + pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + lpTimerName: LPCSTR) -> HANDLE; + pub fn SetWaitableTimer(hTimer: HANDLE, + pDueTime: *const LARGE_INTEGER, + lPeriod: LONG, + pfnCompletionRoutine: PTIMERAPCROUTINE, + lpArgToCompletionRoutine: LPVOID, + fResume: BOOL) -> BOOL; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn WaitForSingleObject(hHandle: HANDLE, + dwMilliseconds: DWORD) -> DWORD; + } +} |
