about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2014-01-22 20:41:29 -0800
committerbors <bors@rust-lang.org>2014-01-22 20:41:29 -0800
commit7ea063ea0f9a28920fafbbf21064f20eb7ca02a8 (patch)
tree429ddb0b6f5287ee3dbe962261f68c09d910db42 /src
parentfce792249e72a181f2ad52413b25b1db643c371f (diff)
parentb8e43838cf7e97b81bf2f7ce6e3d1f8a05c166f5 (diff)
downloadrust-7ea063ea0f9a28920fafbbf21064f20eb7ca02a8.tar.gz
rust-7ea063ea0f9a28920fafbbf21064f20eb7ca02a8.zip
auto merge of #11294 : alexcrichton/rust/native-timer, r=brson
Commit messages have the fun details

Closes #10925
Diffstat (limited to 'src')
-rw-r--r--src/libnative/bookkeeping.rs1
-rw-r--r--src/libnative/io/mod.rs18
-rw-r--r--src/libnative/io/timer_helper.rs143
-rw-r--r--src/libnative/io/timer_other.rs328
-rw-r--r--src/libnative/io/timer_timerfd.rs303
-rw-r--r--src/libnative/io/timer_win32.rs203
-rw-r--r--src/libstd/io/test.rs1
-rw-r--r--src/libstd/io/timer.rs174
-rw-r--r--src/libstd/libc.rs1
-rw-r--r--src/libstd/rt/at_exit_imp.rs72
-rw-r--r--src/libstd/rt/mod.rs22
11 files changed, 1236 insertions, 30 deletions
diff --git a/src/libnative/bookkeeping.rs b/src/libnative/bookkeeping.rs
index 6c5f555f401..b07e4271ee4 100644
--- a/src/libnative/bookkeeping.rs
+++ b/src/libnative/bookkeeping.rs
@@ -45,5 +45,6 @@ pub fn wait_for_other_tasks() {
             TASK_LOCK.wait();
         }
         TASK_LOCK.unlock();
+        TASK_LOCK.destroy();
     }
 }
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index f1bec440547..f3aca7820a5 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -46,6 +46,22 @@ pub mod file;
 pub mod process;
 pub mod net;
 
+#[cfg(target_os = "macos")]
+#[cfg(target_os = "freebsd")]
+#[path = "timer_other.rs"]
+pub mod timer;
+
+#[cfg(target_os = "linux")]
+#[cfg(target_os = "android")]
+#[path = "timer_timerfd.rs"]
+pub mod timer;
+
+#[cfg(target_os = "win32")]
+#[path = "timer_win32.rs"]
+pub mod timer;
+
+mod timer_helper;
+
 type IoResult<T> = Result<T, IoError>;
 
 fn unimpl() -> IoError {
@@ -249,7 +265,7 @@ impl rtio::IoFactory for IoFactory {
 
     // misc
     fn timer_init(&mut self) -> IoResult<~RtioTimer> {
-        Err(unimpl())
+        timer::Timer::new().map(|t| ~t as ~RtioTimer)
     }
     fn spawn(&mut self, config: ProcessConfig)
             -> IoResult<(~RtioProcess, ~[Option<~RtioPipe>])> {
diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs
new file mode 100644
index 00000000000..3c20d073f29
--- /dev/null
+++ b/src/libnative/io/timer_helper.rs
@@ -0,0 +1,143 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Implementation of the helper thread for the timer module
+//!
+//! This module contains the management necessary for the timer worker thread.
+//! This thread is responsible for performing the send()s on channels for timers
+//! that are using channels instead of a blocking call.
+//!
+//! The timer thread is lazily initialized, and it's shut down via the
+//! `shutdown` function provided. It must be maintained as an invariant that
+//! `shutdown` is only called when the entire program is finished. No new timers
+//! can be created in the future and there must be no active timers at that
+//! time.
+
+use std::cast;
+use std::rt;
+use std::unstable::mutex::{Once, ONCE_INIT};
+
+use bookkeeping;
+use io::timer::{Req, Shutdown};
+use task;
+
+// You'll note that these variables are *not* protected by a lock. These
+// variables are initialized with a Once before any Timer is created and are
+// only torn down after everything else has exited. This means that these
+// variables are read-only during use (after initialization) and both of which
+// are safe to use concurrently.
+static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
+static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
+
+pub fn boot(helper: fn(imp::signal, Port<Req>)) {
+    static mut INIT: Once = ONCE_INIT;
+
+    unsafe {
+        INIT.doit(|| {
+            let (msgp, msgc) = SharedChan::new();
+            HELPER_CHAN = cast::transmute(~msgc);
+            let (receive, send) = imp::new();
+            HELPER_SIGNAL = send;
+
+            do task::spawn {
+                bookkeeping::decrement();
+                helper(receive, msgp);
+            }
+
+            rt::at_exit(proc() { shutdown() });
+        })
+    }
+}
+
+pub fn send(req: Req) {
+    unsafe {
+        assert!(!HELPER_CHAN.is_null());
+        (*HELPER_CHAN).send(req);
+        imp::signal(HELPER_SIGNAL);
+    }
+}
+
+fn shutdown() {
+    // We want to wait for the entire helper task to exit, and in doing so it
+    // will attempt to decrement the global task count. When the helper was
+    // created, it decremented the count so it wouldn't count towards preventing
+    // the program to exit, so here we pair that manual decrement with a manual
+    // increment. We will then wait for the helper thread to exit by calling
+    // wait_for_other_tasks.
+    bookkeeping::increment();
+
+    // Request a shutdown, and then wait for the task to exit
+    send(Shutdown);
+    bookkeeping::wait_for_other_tasks();
+
+    // Clean up after ther helper thread
+    unsafe {
+        imp::close(HELPER_SIGNAL);
+        let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
+        HELPER_CHAN = 0 as *mut SharedChan<Req>;
+        HELPER_SIGNAL = 0 as imp::signal;
+    }
+}
+
+#[cfg(unix)]
+mod imp {
+    use std::libc;
+    use std::os;
+
+    use io::file::FileDesc;
+
+    pub type signal = libc::c_int;
+
+    pub fn new() -> (signal, signal) {
+        let pipe = os::pipe();
+        (pipe.input, pipe.out)
+    }
+
+    pub fn signal(fd: libc::c_int) {
+        FileDesc::new(fd, false).inner_write([0]);
+    }
+
+    pub fn close(fd: libc::c_int) {
+        let _fd = FileDesc::new(fd, true);
+    }
+}
+
+#[cfg(windows)]
+mod imp {
+    use std::libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
+    use std::ptr;
+    use std::libc;
+
+    pub type signal = HANDLE;
+
+    pub fn new() -> (HANDLE, HANDLE) {
+        unsafe {
+            let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
+                                      ptr::null());
+            (handle, handle)
+        }
+    }
+
+    pub fn signal(handle: HANDLE) {
+        unsafe { SetEvent(handle); }
+    }
+
+    pub fn close(handle: HANDLE) {
+        unsafe { CloseHandle(handle); }
+    }
+
+    extern "system" {
+        fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
+                        bManualReset: BOOL,
+                        bInitialState: BOOL,
+                        lpName: LPCSTR) -> HANDLE;
+        fn SetEvent(hEvent: HANDLE) -> BOOL;
+    }
+}
diff --git a/src/libnative/io/timer_other.rs b/src/libnative/io/timer_other.rs
new file mode 100644
index 00000000000..24ffd7a4147
--- /dev/null
+++ b/src/libnative/io/timer_other.rs
@@ -0,0 +1,328 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers for non-linux/non-windows OSes
+//!
+//! This module implements timers with a worker thread, select(), and a lot of
+//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
+//! part is that I'm at a loss of what else to do one these OSes. This is also
+//! why linux has a specialized timerfd implementation and windows has its own
+//! implementation (they're more accurate than this one).
+//!
+//! The basic idea is that there is a worker thread that's communicated to via a
+//! channel and a pipe, the pipe is used by the worker thread in a select()
+//! syscall with a timeout. The timeout is the "next timer timeout" while the
+//! channel is used to send data over to the worker thread.
+//!
+//! Whenever the call to select() times out, then a channel receives a message.
+//! Whenever the call returns that the file descriptor has information, then the
+//! channel from timers is drained, enqueueing all incoming requests.
+//!
+//! The actual implementation of the helper thread is a sorted array of
+//! timers in terms of target firing date. The target is the absolute time at
+//! which the timer should fire. Timers are then re-enqueued after a firing if
+//! the repeat boolean is set.
+//!
+//! Naturally, all this logic of adding times and keeping track of
+//! relative/absolute time is a little lossy and not quite exact. I've done the
+//! best I could to reduce the amount of calls to 'now()', but there's likely
+//! still inaccuracies trickling in here and there.
+//!
+//! One of the tricky parts of this implementation is that whenever a timer is
+//! acted upon, it must cancel whatever the previous action was (if one is
+//! active) in order to act like the other implementations of this timer. In
+//! order to do this, the timer's inner pointer is transferred to the worker
+//! thread. Whenever the timer is modified, it first takes ownership back from
+//! the worker thread in order to modify the same data structure. This has the
+//! side effect of "cancelling" the previous requests while allowing a
+//! re-enqueueing later on.
+//!
+//! Note that all time units in this file are in *milliseconds*.
+
+use std::comm::Data;
+use std::hashmap::HashMap;
+use std::libc;
+use std::os;
+use std::ptr;
+use std::rt::rtio;
+use std::sync::atomics;
+use std::unstable::intrinsics;
+
+use io::file::FileDesc;
+use io::IoResult;
+use io::timer_helper;
+
+pub struct Timer {
+    priv id: uint,
+    priv inner: Option<~Inner>,
+}
+
+struct Inner {
+    chan: Option<Chan<()>>,
+    interval: u64,
+    repeat: bool,
+    target: u64,
+    id: uint,
+}
+
+pub enum Req {
+    // Add a new timer to the helper thread.
+    NewTimer(~Inner),
+
+    // Remove a timer based on its id and then send it back on the channel
+    // provided
+    RemoveTimer(uint, Chan<~Inner>),
+
+    // Shut down the loop and then ACK this channel once it's shut down
+    Shutdown,
+}
+
+// returns the current time (in milliseconds)
+fn now() -> u64 {
+    unsafe {
+        let mut now: libc::timeval = intrinsics::init();
+        assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
+        return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
+    }
+}
+
+fn helper(input: libc::c_int, messages: Port<Req>) {
+    let mut set: imp::fd_set = unsafe { intrinsics::init() };
+
+    let mut fd = FileDesc::new(input, true);
+    let mut timeout: libc::timeval = unsafe { intrinsics::init() };
+
+    // active timers are those which are able to be selected upon (and it's a
+    // sorted list, and dead timers are those which have expired, but ownership
+    // hasn't yet been transferred back to the timer itself.
+    let mut active: ~[~Inner] = ~[];
+    let mut dead = HashMap::new();
+
+    // inserts a timer into an array of timers (sorted by firing time)
+    fn insert(t: ~Inner, active: &mut ~[~Inner]) {
+        match active.iter().position(|tm| tm.target > t.target) {
+            Some(pos) => { active.insert(pos, t); }
+            None => { active.push(t); }
+        }
+    }
+
+    // signals the first requests in the queue, possible re-enqueueing it.
+    fn signal(active: &mut ~[~Inner], dead: &mut HashMap<uint, ~Inner>) {
+        let mut timer = match active.shift() {
+            Some(timer) => timer, None => return
+        };
+        let chan = timer.chan.take_unwrap();
+        if chan.try_send(()) && timer.repeat {
+            timer.chan = Some(chan);
+            timer.target += timer.interval;
+            insert(timer, active);
+        } else {
+            drop(chan);
+            dead.insert(timer.id, timer);
+        }
+    }
+
+    'outer: loop {
+        let timeout = match active {
+            // Empty array? no timeout (wait forever for the next request)
+            [] => ptr::null(),
+
+            [~Inner { target, .. }, ..] => {
+                let now = now();
+                // If this request has already expired, then signal it and go
+                // through another iteration
+                if target <= now {
+                    signal(&mut active, &mut dead);
+                    continue;
+                }
+
+                // The actual timeout listed in the requests array is an
+                // absolute date, so here we translate the absolute time to a
+                // relative time.
+                let tm = target - now;
+                timeout.tv_sec = (tm / 1000) as libc::time_t;
+                timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
+                &timeout as *libc::timeval
+            }
+        };
+
+        imp::fd_set(&mut set, input);
+        match unsafe {
+            imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
+        } {
+            // timed out
+            0 => signal(&mut active, &mut dead),
+
+            // file descriptor write woke us up, we've got some new requests
+            1 => {
+                loop {
+                    match messages.try_recv() {
+                        Data(Shutdown) => {
+                            assert!(active.len() == 0);
+                            break 'outer;
+                        }
+
+                        Data(NewTimer(timer)) => insert(timer, &mut active),
+
+                        Data(RemoveTimer(id, ack)) => {
+                            match dead.pop(&id) {
+                                Some(i) => { ack.send(i); continue }
+                                None => {}
+                            }
+                            let i = active.iter().position(|i| i.id == id);
+                            let i = i.expect("no timer found");
+                            let t = active.remove(i).unwrap();
+                            ack.send(t);
+                        }
+                        _ => break
+                    }
+                }
+
+                // drain the file descriptor
+                let mut buf = [0];
+                fd.inner_read(buf);
+            }
+
+            -1 if os::errno() == libc::EINTR as int => {}
+            n => fail!("helper thread failed in select() with error: {} ({})",
+                       n, os::last_os_error())
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+
+        static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
+        let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
+        Ok(Timer {
+            id: id,
+            inner: Some(~Inner {
+                chan: None,
+                interval: 0,
+                target: 0,
+                repeat: false,
+                id: id,
+            })
+        })
+    }
+
+    pub fn sleep(ms: u64) {
+        unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
+    }
+
+    fn inner(&mut self) -> ~Inner {
+        match self.inner.take() {
+            Some(i) => i,
+            None => {
+                let (p, c) = Chan::new();
+                timer_helper::send(RemoveTimer(self.id, c));
+                p.recv()
+            }
+        }
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        let mut inner = self.inner();
+        inner.chan = None; // cancel any previous request
+        self.inner = Some(inner);
+
+        Timer::sleep(msecs);
+    }
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        let now = now();
+        let mut inner = self.inner();
+
+        let (p, c) = Chan::new();
+        inner.repeat = false;
+        inner.chan = Some(c);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        timer_helper::send(NewTimer(inner));
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        let now = now();
+        let mut inner = self.inner();
+
+        let (p, c) = Chan::new();
+        inner.repeat = true;
+        inner.chan = Some(c);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        timer_helper::send(NewTimer(inner));
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.inner = Some(self.inner());
+    }
+}
+
+#[cfg(target_os = "macos")]
+mod imp {
+    use std::libc;
+
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [i32, ..(FD_SETSIZE / 32)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        set.fds_bits[fd / 32] |= 1 << (fd % 32);
+    }
+
+    extern {
+        pub fn select(nfds: libc::c_int,
+                      readfds: *fd_set,
+                      writefds: *fd_set,
+                      errorfds: *fd_set,
+                      timeout: *libc::timeval) -> libc::c_int;
+
+        pub fn gettimeofday(timeval: *mut libc::timeval,
+                            tzp: *libc::c_void) -> libc::c_int;
+    }
+}
+
+#[cfg(target_os = "freebsd")]
+mod imp {
+    use std::libc;
+
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [u64, ..(FD_SETSIZE / 64)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        set.fds_bits[fd / 64] |= (1 << (fd % 64)) as u64;
+    }
+
+    extern {
+        pub fn select(nfds: libc::c_int,
+                      readfds: *fd_set,
+                      writefds: *fd_set,
+                      errorfds: *fd_set,
+                      timeout: *libc::timeval) -> libc::c_int;
+
+        pub fn gettimeofday(timeval: *mut libc::timeval,
+                            tzp: *libc::c_void) -> libc::c_int;
+    }
+}
diff --git a/src/libnative/io/timer_timerfd.rs b/src/libnative/io/timer_timerfd.rs
new file mode 100644
index 00000000000..4912f4f431f
--- /dev/null
+++ b/src/libnative/io/timer_timerfd.rs
@@ -0,0 +1,303 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers based on timerfd_create(2)
+//!
+//! On OSes which support timerfd_create, we can use these much more accurate
+//! timers over select() + a timeout (see timer_other.rs). This strategy still
+//! employs a worker thread which does the waiting on the timer fds (to send
+//! messages away).
+//!
+//! The worker thread in this implementation uses epoll(7) to block. It
+//! maintains a working set of *all* native timers in the process, along with a
+//! pipe file descriptor used to communicate that there is data available on the
+//! incoming channel to the worker thread. Timers send requests to update their
+//! timerfd settings to the worker thread (see the comment above 'oneshot' for
+//! why).
+//!
+//! As with timer_other, timers just using sleep() do not use the timerfd at
+//! all. They remove the timerfd from the worker thread and then invoke usleep()
+//! to block the calling thread.
+//!
+//! As with timer_other, all units in this file are in units of millseconds.
+
+use std::comm::Data;
+use std::libc;
+use std::ptr;
+use std::os;
+use std::rt::rtio;
+use std::hashmap::HashMap;
+use std::unstable::intrinsics;
+
+use io::file::FileDesc;
+use io::IoResult;
+use io::timer_helper;
+
+pub struct Timer {
+    priv fd: FileDesc,
+    priv on_worker: bool,
+}
+
+pub enum Req {
+    NewTimer(libc::c_int, Chan<()>, bool, imp::itimerspec),
+    RemoveTimer(libc::c_int, Chan<()>),
+    Shutdown,
+}
+
+fn helper(input: libc::c_int, messages: Port<Req>) {
+    let efd = unsafe { imp::epoll_create(10) };
+    let _fd1 = FileDesc::new(input, true);
+    let _fd2 = FileDesc::new(efd, true);
+
+    fn add(efd: libc::c_int, fd: libc::c_int) {
+        let event = imp::epoll_event {
+            events: imp::EPOLLIN as u32,
+            data: imp::epoll_data_t { fd: fd, pad: 0, }
+        };
+        let ret = unsafe {
+            imp::epoll_ctl(efd, imp::EPOLL_CTL_ADD, fd, &event)
+        };
+        assert_eq!(ret, 0);
+    }
+    fn del(efd: libc::c_int, fd: libc::c_int) {
+        let event = imp::epoll_event {
+            events: 0, data: imp::epoll_data_t { fd: 0, pad: 0, }
+        };
+        let ret = unsafe {
+            imp::epoll_ctl(efd, imp::EPOLL_CTL_DEL, fd, &event)
+        };
+        assert_eq!(ret, 0);
+    }
+
+    add(efd, input);
+    let events: [imp::epoll_event, ..16] = unsafe { intrinsics::init() };
+    let mut map: HashMap<libc::c_int, (Chan<()>, bool)> = HashMap::new();
+    'outer: loop {
+        let n = match unsafe {
+            imp::epoll_wait(efd, events.as_ptr(),
+                            events.len() as libc::c_int, -1)
+        } {
+            0 => fail!("epoll_wait returned immediately!"),
+            -1 => fail!("epoll wait failed: {}", os::last_os_error()),
+            n => n
+        };
+
+        let mut incoming = false;
+        debug!("{} events to process", n);
+        for event in events.slice_to(n as uint).iter() {
+            let fd = event.data.fd;
+            debug!("data on fd {} (input = {})", fd, input);
+            if fd == input {
+                let mut buf = [0, ..1];
+                // drain the input file descriptor of its input
+                FileDesc::new(fd, false).inner_read(buf);
+                incoming = true;
+            } else {
+                let mut bits = [0, ..8];
+                // drain the timerfd of how many times its fired
+                //
+                // XXX: should this perform a send() this number of
+                //      times?
+                FileDesc::new(fd, false).inner_read(bits);
+                let remove = {
+                    match map.find(&fd).expect("fd unregistered") {
+                        &(ref c, oneshot) => !c.try_send(()) || oneshot
+                    }
+                };
+                if remove {
+                    map.remove(&fd);
+                    del(efd, fd);
+                }
+            }
+        }
+
+        while incoming {
+            match messages.try_recv() {
+                Data(NewTimer(fd, chan, one, timeval)) => {
+                    // acknowledge we have the new channel, we will never send
+                    // another message to the old channel
+                    chan.send(());
+
+                    // If we haven't previously seen the file descriptor, then
+                    // we need to add it to the epoll set.
+                    if map.insert(fd, (chan, one)) {
+                        add(efd, fd);
+                    }
+
+                    // Update the timerfd's time value now that we have control
+                    // of the timerfd
+                    let ret = unsafe {
+                        imp::timerfd_settime(fd, 0, &timeval, ptr::null())
+                    };
+                    assert_eq!(ret, 0);
+                }
+
+                Data(RemoveTimer(fd, chan)) => {
+                    if map.remove(&fd) {
+                        del(efd, fd);
+                    }
+                    chan.send(());
+                }
+
+                Data(Shutdown) => {
+                    assert!(map.len() == 0);
+                    break 'outer;
+                }
+
+                _ => break,
+            }
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+        match unsafe { imp::timerfd_create(imp::CLOCK_MONOTONIC, 0) } {
+            -1 => Err(super::last_error()),
+            n => Ok(Timer { fd: FileDesc::new(n, true), on_worker: false, }),
+        }
+    }
+
+    pub fn sleep(ms: u64) {
+        unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
+    }
+
+    fn remove(&mut self) {
+        if !self.on_worker { return }
+
+        let (p, c) = Chan::new();
+        timer_helper::send(RemoveTimer(self.fd.fd(), c));
+        p.recv();
+        self.on_worker = false;
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        self.remove();
+        Timer::sleep(msecs);
+    }
+
+    // Periodic and oneshot channels are updated by updating the settings on the
+    // corresopnding timerfd. The update is not performed on the thread calling
+    // oneshot or period, but rather the helper epoll thread. The reason for
+    // this is to avoid losing messages and avoid leaking messages across ports.
+    //
+    // By updating the timerfd on the helper thread, we're guaranteed that all
+    // messages for a particular setting of the timer will be received by the
+    // new channel/port pair rather than leaking old messages onto the new port
+    // or leaking new messages onto the old port.
+    //
+    // We also wait for the remote thread to actually receive the new settings
+    // before returning to guarantee the invariant that when oneshot() and
+    // period() return that the old port will never receive any more messages.
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        let (p, c) = Chan::new();
+
+        let new_value = imp::itimerspec {
+            it_interval: imp::timespec { tv_sec: 0, tv_nsec: 0 },
+            it_value: imp::timespec {
+                tv_sec: (msecs / 1000) as libc::time_t,
+                tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
+            }
+        };
+        timer_helper::send(NewTimer(self.fd.fd(), c, true, new_value));
+        p.recv();
+        self.on_worker = true;
+
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        let (p, c) = Chan::new();
+
+        let spec = imp::timespec {
+            tv_sec: (msecs / 1000) as libc::time_t,
+            tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
+        };
+        let new_value = imp::itimerspec { it_interval: spec, it_value: spec, };
+        timer_helper::send(NewTimer(self.fd.fd(), c, false, new_value));
+        p.recv();
+        self.on_worker = true;
+
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        // When the timerfd file descriptor is closed, it will be automatically
+        // removed from the epoll set of the worker thread, but we want to make
+        // sure that the associated channel is also removed from the worker's
+        // hash map.
+        self.remove();
+    }
+}
+
+#[allow(dead_code)]
+mod imp {
+    use std::libc;
+
+    pub static CLOCK_MONOTONIC: libc::c_int = 1;
+    pub static EPOLL_CTL_ADD: libc::c_int = 1;
+    pub static EPOLL_CTL_DEL: libc::c_int = 2;
+    pub static EPOLL_CTL_MOD: libc::c_int = 3;
+    pub static EPOLLIN: libc::c_int = 0x001;
+    pub static EPOLLOUT: libc::c_int = 0x004;
+    pub static EPOLLPRI: libc::c_int = 0x002;
+    pub static EPOLLERR: libc::c_int = 0x008;
+    pub static EPOLLRDHUP: libc::c_int = 0x2000;
+    pub static EPOLLET: libc::c_int = 1 << 31;
+    pub static EPOLLHUP: libc::c_int = 0x010;
+    pub static EPOLLONESHOT: libc::c_int = 1 << 30;
+
+    pub struct epoll_event {
+        events: u32,
+        data: epoll_data_t,
+    }
+
+    pub struct epoll_data_t {
+        fd: i32,
+        pad: u32,
+    }
+
+    pub struct timespec {
+        tv_sec: libc::time_t,
+        tv_nsec: libc::c_long,
+    }
+
+    pub struct itimerspec {
+        it_interval: timespec,
+        it_value: timespec,
+    }
+
+    extern {
+        pub fn timerfd_create(clockid: libc::c_int,
+                              flags: libc::c_int) -> libc::c_int;
+        pub fn timerfd_settime(fd: libc::c_int,
+                               flags: libc::c_int,
+                               new_value: *itimerspec,
+                               old_value: *itimerspec) -> libc::c_int;
+        pub fn timerfd_gettime(fd: libc::c_int,
+                               curr_value: *itimerspec) -> libc::c_int;
+
+        pub fn epoll_create(size: libc::c_int) -> libc::c_int;
+        pub fn epoll_ctl(epfd: libc::c_int,
+                         op: libc::c_int,
+                         fd: libc::c_int,
+                         event: *epoll_event) -> libc::c_int;
+        pub fn epoll_wait(epfd: libc::c_int,
+                          events: *epoll_event,
+                          maxevents: libc::c_int,
+                          timeout: libc::c_int) -> libc::c_int;
+    }
+}
diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs
new file mode 100644
index 00000000000..e359d99eedf
--- /dev/null
+++ b/src/libnative/io/timer_win32.rs
@@ -0,0 +1,203 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers based on win32 WaitableTimers
+//!
+//! This implementation is meant to be used solely on windows. As with other
+//! implementations, there is a worker thread which is doing all the waiting on
+//! a large number of timers for all active timers in the system. This worker
+//! thread uses the select() equivalent, WaitForMultipleObjects. One of the
+//! objects being waited on is a signal into the worker thread to notify that
+//! the incoming channel should be looked at.
+//!
+//! Other than that, the implementation is pretty straightforward in terms of
+//! the other two implementations of timers with nothing *that* new showing up.
+
+use std::comm::Data;
+use std::libc;
+use std::ptr;
+use std::rt::rtio;
+
+use io::timer_helper;
+use io::IoResult;
+
+pub struct Timer {
+    priv obj: libc::HANDLE,
+    priv on_worker: bool,
+}
+
+pub enum Req {
+    NewTimer(libc::HANDLE, Chan<()>, bool),
+    RemoveTimer(libc::HANDLE, Chan<()>),
+    Shutdown,
+}
+
+fn helper(input: libc::HANDLE, messages: Port<Req>) {
+    let mut objs = ~[input];
+    let mut chans = ~[];
+
+    'outer: loop {
+        let idx = unsafe {
+            imp::WaitForMultipleObjects(objs.len() as libc::DWORD,
+                                        objs.as_ptr(),
+                                        0 as libc::BOOL,
+                                        libc::INFINITE)
+        };
+
+        if idx == 0 {
+            loop {
+                match messages.try_recv() {
+                    Data(NewTimer(obj, c, one)) => {
+                        objs.push(obj);
+                        chans.push((c, one));
+                    }
+                    Data(RemoveTimer(obj, c)) => {
+                        c.send(());
+                        match objs.iter().position(|&o| o == obj) {
+                            Some(i) => {
+                                objs.remove(i);
+                                chans.remove(i - 1);
+                            }
+                            None => {}
+                        }
+                    }
+                    Data(Shutdown) => {
+                        assert_eq!(objs.len(), 1);
+                        assert_eq!(chans.len(), 0);
+                        break 'outer;
+                    }
+                    _ => break
+                }
+            }
+        } else {
+            let remove = {
+                match &chans[idx - 1] {
+                    &(ref c, oneshot) => !c.try_send(()) || oneshot
+                }
+            };
+            if remove {
+                objs.remove(idx as uint);
+                chans.remove(idx as uint - 1);
+            }
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+
+        let obj = unsafe {
+            imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null())
+        };
+        if obj.is_null() {
+            Err(super::last_error())
+        } else {
+            Ok(Timer { obj: obj, on_worker: false, })
+        }
+    }
+
+    pub fn sleep(ms: u64) {
+        use std::rt::rtio::RtioTimer;
+        let mut t = Timer::new().ok().expect("must allocate a timer!");
+        t.sleep(ms);
+    }
+
+    fn remove(&mut self) {
+        if !self.on_worker { return }
+
+        let (p, c) = Chan::new();
+        timer_helper::send(RemoveTimer(self.obj, c));
+        p.recv();
+
+        self.on_worker = false;
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        self.remove();
+
+        // there are 10^6 nanoseconds in a millisecond, and the parameter is in
+        // 100ns intervals, so we multiply by 10^4.
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
+                                  ptr::mut_null(), 0)
+        }, 1);
+
+        unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE); }
+    }
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        self.remove();
+        let (p, c) = Chan::new();
+
+        // see above for the calculation
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
+                                  ptr::mut_null(), 0)
+        }, 1);
+
+        timer_helper::send(NewTimer(self.obj, c, true));
+        self.on_worker = true;
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        self.remove();
+        let (p, c) = Chan::new();
+
+        // see above for the calculation
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG,
+                                  ptr::null(), ptr::mut_null(), 0)
+        }, 1);
+
+        timer_helper::send(NewTimer(self.obj, c, false));
+        self.on_worker = true;
+
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.remove();
+        unsafe { libc::CloseHandle(self.obj); }
+    }
+}
+
+mod imp {
+    use std::libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER,
+                    LONG, LPVOID, DWORD, c_void};
+
+    pub type PTIMERAPCROUTINE = *c_void;
+
+    extern "system" {
+        pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES,
+                                    bManualReset: BOOL,
+                                    lpTimerName: LPCSTR) -> HANDLE;
+        pub fn SetWaitableTimer(hTimer: HANDLE,
+                                pDueTime: *LARGE_INTEGER,
+                                lPeriod: LONG,
+                                pfnCompletionRoutine: PTIMERAPCROUTINE,
+                                lpArgToCompletionRoutine: LPVOID,
+                                fResume: BOOL) -> BOOL;
+        pub fn WaitForMultipleObjects(nCount: DWORD,
+                                      lpHandles: *HANDLE,
+                                      bWaitAll: BOOL,
+                                      dwMilliseconds: DWORD) -> DWORD;
+        pub fn WaitForSingleObject(hHandle: HANDLE,
+                                   dwMilliseconds: DWORD) -> DWORD;
+    }
+}
diff --git a/src/libstd/io/test.rs b/src/libstd/io/test.rs
index 92b2cfa8be2..d81de989df7 100644
--- a/src/libstd/io/test.rs
+++ b/src/libstd/io/test.rs
@@ -34,6 +34,7 @@ macro_rules! iotest (
             use io::net::udp::*;
             #[cfg(unix)]
             use io::net::unix::*;
+            use io::timer::*;
             use io::process::*;
             use str;
             use util;
diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs
index d156a7460e1..4bf89a1d559 100644
--- a/src/libstd/io/timer.rs
+++ b/src/libstd/io/timer.rs
@@ -96,61 +96,177 @@ impl Timer {
 
 #[cfg(test)]
 mod test {
-    use prelude::*;
-    use super::*;
-
-    #[test]
-    fn test_io_timer_sleep_simple() {
+    iotest!(fn test_io_timer_sleep_simple() {
         let mut timer = Timer::new().unwrap();
         timer.sleep(1);
-    }
+    })
 
-    #[test]
-    fn test_io_timer_sleep_oneshot() {
+    iotest!(fn test_io_timer_sleep_oneshot() {
         let mut timer = Timer::new().unwrap();
         timer.oneshot(1).recv();
-    }
+    })
 
-    #[test]
-    fn test_io_timer_sleep_oneshot_forget() {
+    iotest!(fn test_io_timer_sleep_oneshot_forget() {
         let mut timer = Timer::new().unwrap();
         timer.oneshot(100000000000);
-    }
+    })
 
-    #[test]
-    fn oneshot_twice() {
+    iotest!(fn oneshot_twice() {
         let mut timer = Timer::new().unwrap();
         let port1 = timer.oneshot(10000);
         let port = timer.oneshot(1);
         port.recv();
-        assert!(port1.recv_opt().is_none());
-    }
+        assert_eq!(port1.recv_opt(), None);
+    })
 
-    #[test]
-    fn test_io_timer_oneshot_then_sleep() {
+    iotest!(fn test_io_timer_oneshot_then_sleep() {
         let mut timer = Timer::new().unwrap();
         let port = timer.oneshot(100000000000);
         timer.sleep(1); // this should invalidate the port
-        assert!(port.recv_opt().is_none());
-    }
 
-    #[test]
-    fn test_io_timer_sleep_periodic() {
+        assert_eq!(port.recv_opt(), None);
+    })
+
+    iotest!(fn test_io_timer_sleep_periodic() {
         let mut timer = Timer::new().unwrap();
         let port = timer.periodic(1);
         port.recv();
         port.recv();
         port.recv();
-    }
+    })
 
-    #[test]
-    fn test_io_timer_sleep_periodic_forget() {
+    iotest!(fn test_io_timer_sleep_periodic_forget() {
         let mut timer = Timer::new().unwrap();
         timer.periodic(100000000000);
-    }
+    })
 
-    #[test]
-    fn test_io_timer_sleep_standalone() {
+    iotest!(fn test_io_timer_sleep_standalone() {
         sleep(1)
-    }
+    })
+
+    iotest!(fn oneshot() {
+        let mut timer = Timer::new().unwrap();
+
+        let port = timer.oneshot(1);
+        port.recv();
+        assert!(port.recv_opt().is_none());
+
+        let port = timer.oneshot(1);
+        port.recv();
+        assert!(port.recv_opt().is_none());
+    })
+
+    iotest!(fn override() {
+        let mut timer = Timer::new().unwrap();
+        let oport = timer.oneshot(100);
+        let pport = timer.periodic(100);
+        timer.sleep(1);
+        assert_eq!(oport.recv_opt(), None);
+        assert_eq!(pport.recv_opt(), None);
+        timer.oneshot(1).recv();
+    })
+
+    iotest!(fn period() {
+        let mut timer = Timer::new().unwrap();
+        let port = timer.periodic(1);
+        port.recv();
+        port.recv();
+        let port2 = timer.periodic(1);
+        port2.recv();
+        port2.recv();
+    })
+
+    iotest!(fn sleep() {
+        let mut timer = Timer::new().unwrap();
+        timer.sleep(1);
+        timer.sleep(1);
+    })
+
+    iotest!(fn oneshot_fail() {
+        let mut timer = Timer::new().unwrap();
+        let _port = timer.oneshot(1);
+        fail!();
+    } #[should_fail])
+
+    iotest!(fn period_fail() {
+        let mut timer = Timer::new().unwrap();
+        let _port = timer.periodic(1);
+        fail!();
+    } #[should_fail])
+
+    iotest!(fn normal_fail() {
+        let _timer = Timer::new().unwrap();
+        fail!();
+    } #[should_fail])
+
+    iotest!(fn closing_channel_during_drop_doesnt_kill_everything() {
+        // see issue #10375
+        let mut timer = Timer::new().unwrap();
+        let timer_port = timer.periodic(1000);
+
+        do spawn {
+            timer_port.recv_opt();
+        }
+
+        // when we drop the TimerWatcher we're going to destroy the channel,
+        // which must wake up the task on the other end
+    })
+
+    iotest!(fn reset_doesnt_switch_tasks() {
+        // similar test to the one above.
+        let mut timer = Timer::new().unwrap();
+        let timer_port = timer.periodic(1000);
+
+        do spawn {
+            timer_port.recv_opt();
+        }
+
+        timer.oneshot(1);
+    })
+
+    iotest!(fn reset_doesnt_switch_tasks2() {
+        // similar test to the one above.
+        let mut timer = Timer::new().unwrap();
+        let timer_port = timer.periodic(1000);
+
+        do spawn {
+            timer_port.recv_opt();
+        }
+
+        timer.sleep(1);
+    })
+
+    iotest!(fn sender_goes_away_oneshot() {
+        let port = {
+            let mut timer = Timer::new().unwrap();
+            timer.oneshot(1000)
+        };
+        assert_eq!(port.recv_opt(), None);
+    })
+
+    iotest!(fn sender_goes_away_period() {
+        let port = {
+            let mut timer = Timer::new().unwrap();
+            timer.periodic(1000)
+        };
+        assert_eq!(port.recv_opt(), None);
+    })
+
+    iotest!(fn receiver_goes_away_oneshot() {
+        let mut timer1 = Timer::new().unwrap();
+        timer1.oneshot(1);
+        let mut timer2 = Timer::new().unwrap();
+        // while sleeping, the prevous timer should fire and not have its
+        // callback do something terrible.
+        timer2.sleep(2);
+    })
+
+    iotest!(fn receiver_goes_away_period() {
+        let mut timer1 = Timer::new().unwrap();
+        timer1.periodic(1);
+        let mut timer2 = Timer::new().unwrap();
+        // while sleeping, the prevous timer should fire and not have its
+        // callback do something terrible.
+        timer2.sleep(2);
+    })
 }
diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs
index 77ac226a7f1..8975c2a7955 100644
--- a/src/libstd/libc.rs
+++ b/src/libstd/libc.rs
@@ -3548,6 +3548,7 @@ pub mod funcs {
                 pub fn setsid() -> pid_t;
                 pub fn setuid(uid: uid_t) -> c_int;
                 pub fn sleep(secs: c_uint) -> c_uint;
+                pub fn usleep(secs: c_uint) -> c_int;
                 pub fn sysconf(name: c_int) -> c_long;
                 pub fn tcgetpgrp(fd: c_int) -> pid_t;
                 pub fn ttyname(fd: c_int) -> *c_char;
diff --git a/src/libstd/rt/at_exit_imp.rs b/src/libstd/rt/at_exit_imp.rs
new file mode 100644
index 00000000000..6f9be64a73d
--- /dev/null
+++ b/src/libstd/rt/at_exit_imp.rs
@@ -0,0 +1,72 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Implementation of running at_exit routines
+//!
+//! Documentation can be found on the `rt::at_exit` function.
+
+use cast;
+use iter::Iterator;
+use option::{Some, None};
+use ptr::RawPtr;
+use unstable::sync::Exclusive;
+use util;
+use vec::OwnedVector;
+
+type Queue = Exclusive<~[proc()]>;
+
+// You'll note that these variables are *not* atomic, and this is done on
+// purpose. This module is designed to have init() called *once* in a
+// single-task context, and then run() is called only once in another
+// single-task context. As a result of this, only the `push` function is
+// thread-safe, and it assumes that the `init` function has run previously.
+static mut QUEUE: *mut Queue = 0 as *mut Queue;
+static mut RUNNING: bool = false;
+
+pub fn init() {
+    unsafe {
+        rtassert!(!RUNNING);
+        rtassert!(QUEUE.is_null());
+        let state: ~Queue = ~Exclusive::new(~[]);
+        QUEUE = cast::transmute(state);
+    }
+}
+
+pub fn push(f: proc()) {
+    unsafe {
+        rtassert!(!RUNNING);
+        rtassert!(!QUEUE.is_null());
+        let state: &mut Queue = cast::transmute(QUEUE);
+        let mut f = Some(f);
+        state.with(|arr|  {
+            arr.push(f.take_unwrap());
+        });
+    }
+}
+
+pub fn run() {
+    let vec = unsafe {
+        rtassert!(!RUNNING);
+        rtassert!(!QUEUE.is_null());
+        RUNNING = true;
+        let state: ~Queue = cast::transmute(QUEUE);
+        QUEUE = 0 as *mut Queue;
+        let mut vec = None;
+        state.with(|arr| {
+            vec = Some(util::replace(arr, ~[]));
+        });
+        vec.take_unwrap()
+    };
+
+
+    for f in vec.move_iter() {
+        f();
+    }
+}
diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs
index 40e9a3ec5b2..7aa966802f2 100644
--- a/src/libstd/rt/mod.rs
+++ b/src/libstd/rt/mod.rs
@@ -127,6 +127,9 @@ mod util;
 // Global command line argument storage
 pub mod args;
 
+// Support for running procedures when a program has exited.
+mod at_exit_imp;
+
 /// The default error code of the rust runtime if the main task fails instead
 /// of exiting cleanly.
 pub static DEFAULT_ERROR_CODE: int = 101;
@@ -171,9 +174,27 @@ pub fn init(argc: int, argv: **u8) {
         env::init();
         logging::init();
         local_ptr::init();
+        at_exit_imp::init();
     }
 }
 
+/// Enqueues a procedure to run when the runtime is cleaned up
+///
+/// The procedure passed to this function will be executed as part of the
+/// runtime cleanup phase. For normal rust programs, this means that it will run
+/// after all other tasks have exited.
+///
+/// The procedure is *not* executed with a local `Task` available to it, so
+/// primitives like logging, I/O, channels, spawning, etc, are *not* available.
+/// This is meant for "bare bones" usage to clean up runtime details, this is
+/// not meant as a general-purpose "let's clean everything up" function.
+///
+/// It is forbidden for procedures to register more `at_exit` handlers when they
+/// are running, and doing so will lead to a process abort.
+pub fn at_exit(f: proc()) {
+    at_exit_imp::push(f);
+}
+
 /// One-time runtime cleanup.
 ///
 /// This function is unsafe because it performs no checks to ensure that the
@@ -184,6 +205,7 @@ pub fn init(argc: int, argv: **u8) {
 /// Invoking cleanup while portions of the runtime are still in use may cause
 /// undefined behavior.
 pub unsafe fn cleanup() {
+    at_exit_imp::run();
     args::cleanup();
     local_ptr::cleanup();
 }