about summary refs log tree commit diff
path: root/src/libnative
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnative')
-rw-r--r--src/libnative/bookkeeping.rs1
-rw-r--r--src/libnative/io/mod.rs18
-rw-r--r--src/libnative/io/timer_helper.rs143
-rw-r--r--src/libnative/io/timer_other.rs328
-rw-r--r--src/libnative/io/timer_timerfd.rs303
-rw-r--r--src/libnative/io/timer_win32.rs203
6 files changed, 995 insertions, 1 deletions
diff --git a/src/libnative/bookkeeping.rs b/src/libnative/bookkeeping.rs
index 6c5f555f401..b07e4271ee4 100644
--- a/src/libnative/bookkeeping.rs
+++ b/src/libnative/bookkeeping.rs
@@ -45,5 +45,6 @@ pub fn wait_for_other_tasks() {
             TASK_LOCK.wait();
         }
         TASK_LOCK.unlock();
+        TASK_LOCK.destroy();
     }
 }
diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs
index f1bec440547..f3aca7820a5 100644
--- a/src/libnative/io/mod.rs
+++ b/src/libnative/io/mod.rs
@@ -46,6 +46,22 @@ pub mod file;
 pub mod process;
 pub mod net;
 
+#[cfg(target_os = "macos")]
+#[cfg(target_os = "freebsd")]
+#[path = "timer_other.rs"]
+pub mod timer;
+
+#[cfg(target_os = "linux")]
+#[cfg(target_os = "android")]
+#[path = "timer_timerfd.rs"]
+pub mod timer;
+
+#[cfg(target_os = "win32")]
+#[path = "timer_win32.rs"]
+pub mod timer;
+
+mod timer_helper;
+
 type IoResult<T> = Result<T, IoError>;
 
 fn unimpl() -> IoError {
@@ -249,7 +265,7 @@ impl rtio::IoFactory for IoFactory {
 
     // misc
     fn timer_init(&mut self) -> IoResult<~RtioTimer> {
-        Err(unimpl())
+        timer::Timer::new().map(|t| ~t as ~RtioTimer)
     }
     fn spawn(&mut self, config: ProcessConfig)
             -> IoResult<(~RtioProcess, ~[Option<~RtioPipe>])> {
diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs
new file mode 100644
index 00000000000..3c20d073f29
--- /dev/null
+++ b/src/libnative/io/timer_helper.rs
@@ -0,0 +1,143 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Implementation of the helper thread for the timer module
+//!
+//! This module contains the management necessary for the timer worker thread.
+//! This thread is responsible for performing the send()s on channels for timers
+//! that are using channels instead of a blocking call.
+//!
+//! The timer thread is lazily initialized, and it's shut down via the
+//! `shutdown` function provided. It must be maintained as an invariant that
+//! `shutdown` is only called when the entire program is finished. No new timers
+//! can be created in the future and there must be no active timers at that
+//! time.
+
+use std::cast;
+use std::rt;
+use std::unstable::mutex::{Once, ONCE_INIT};
+
+use bookkeeping;
+use io::timer::{Req, Shutdown};
+use task;
+
+// You'll note that these variables are *not* protected by a lock. These
+// variables are initialized with a Once before any Timer is created and are
+// only torn down after everything else has exited. This means that these
+// variables are read-only during use (after initialization) and both of which
+// are safe to use concurrently.
+static mut HELPER_CHAN: *mut SharedChan<Req> = 0 as *mut SharedChan<Req>;
+static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal;
+
+pub fn boot(helper: fn(imp::signal, Port<Req>)) {
+    static mut INIT: Once = ONCE_INIT;
+
+    unsafe {
+        INIT.doit(|| {
+            let (msgp, msgc) = SharedChan::new();
+            HELPER_CHAN = cast::transmute(~msgc);
+            let (receive, send) = imp::new();
+            HELPER_SIGNAL = send;
+
+            do task::spawn {
+                bookkeeping::decrement();
+                helper(receive, msgp);
+            }
+
+            rt::at_exit(proc() { shutdown() });
+        })
+    }
+}
+
+pub fn send(req: Req) {
+    unsafe {
+        assert!(!HELPER_CHAN.is_null());
+        (*HELPER_CHAN).send(req);
+        imp::signal(HELPER_SIGNAL);
+    }
+}
+
+fn shutdown() {
+    // We want to wait for the entire helper task to exit, and in doing so it
+    // will attempt to decrement the global task count. When the helper was
+    // created, it decremented the count so it wouldn't count towards preventing
+    // the program to exit, so here we pair that manual decrement with a manual
+    // increment. We will then wait for the helper thread to exit by calling
+    // wait_for_other_tasks.
+    bookkeeping::increment();
+
+    // Request a shutdown, and then wait for the task to exit
+    send(Shutdown);
+    bookkeeping::wait_for_other_tasks();
+
+    // Clean up after ther helper thread
+    unsafe {
+        imp::close(HELPER_SIGNAL);
+        let _chan: ~SharedChan<Req> = cast::transmute(HELPER_CHAN);
+        HELPER_CHAN = 0 as *mut SharedChan<Req>;
+        HELPER_SIGNAL = 0 as imp::signal;
+    }
+}
+
+#[cfg(unix)]
+mod imp {
+    use std::libc;
+    use std::os;
+
+    use io::file::FileDesc;
+
+    pub type signal = libc::c_int;
+
+    pub fn new() -> (signal, signal) {
+        let pipe = os::pipe();
+        (pipe.input, pipe.out)
+    }
+
+    pub fn signal(fd: libc::c_int) {
+        FileDesc::new(fd, false).inner_write([0]);
+    }
+
+    pub fn close(fd: libc::c_int) {
+        let _fd = FileDesc::new(fd, true);
+    }
+}
+
+#[cfg(windows)]
+mod imp {
+    use std::libc::{BOOL, LPCSTR, HANDLE, LPSECURITY_ATTRIBUTES, CloseHandle};
+    use std::ptr;
+    use std::libc;
+
+    pub type signal = HANDLE;
+
+    pub fn new() -> (HANDLE, HANDLE) {
+        unsafe {
+            let handle = CreateEventA(ptr::mut_null(), libc::FALSE, libc::FALSE,
+                                      ptr::null());
+            (handle, handle)
+        }
+    }
+
+    pub fn signal(handle: HANDLE) {
+        unsafe { SetEvent(handle); }
+    }
+
+    pub fn close(handle: HANDLE) {
+        unsafe { CloseHandle(handle); }
+    }
+
+    extern "system" {
+        fn CreateEventA(lpSecurityAttributes: LPSECURITY_ATTRIBUTES,
+                        bManualReset: BOOL,
+                        bInitialState: BOOL,
+                        lpName: LPCSTR) -> HANDLE;
+        fn SetEvent(hEvent: HANDLE) -> BOOL;
+    }
+}
diff --git a/src/libnative/io/timer_other.rs b/src/libnative/io/timer_other.rs
new file mode 100644
index 00000000000..24ffd7a4147
--- /dev/null
+++ b/src/libnative/io/timer_other.rs
@@ -0,0 +1,328 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers for non-linux/non-windows OSes
+//!
+//! This module implements timers with a worker thread, select(), and a lot of
+//! witchcraft that turns out to be horribly inaccurate timers. The unfortunate
+//! part is that I'm at a loss of what else to do one these OSes. This is also
+//! why linux has a specialized timerfd implementation and windows has its own
+//! implementation (they're more accurate than this one).
+//!
+//! The basic idea is that there is a worker thread that's communicated to via a
+//! channel and a pipe, the pipe is used by the worker thread in a select()
+//! syscall with a timeout. The timeout is the "next timer timeout" while the
+//! channel is used to send data over to the worker thread.
+//!
+//! Whenever the call to select() times out, then a channel receives a message.
+//! Whenever the call returns that the file descriptor has information, then the
+//! channel from timers is drained, enqueueing all incoming requests.
+//!
+//! The actual implementation of the helper thread is a sorted array of
+//! timers in terms of target firing date. The target is the absolute time at
+//! which the timer should fire. Timers are then re-enqueued after a firing if
+//! the repeat boolean is set.
+//!
+//! Naturally, all this logic of adding times and keeping track of
+//! relative/absolute time is a little lossy and not quite exact. I've done the
+//! best I could to reduce the amount of calls to 'now()', but there's likely
+//! still inaccuracies trickling in here and there.
+//!
+//! One of the tricky parts of this implementation is that whenever a timer is
+//! acted upon, it must cancel whatever the previous action was (if one is
+//! active) in order to act like the other implementations of this timer. In
+//! order to do this, the timer's inner pointer is transferred to the worker
+//! thread. Whenever the timer is modified, it first takes ownership back from
+//! the worker thread in order to modify the same data structure. This has the
+//! side effect of "cancelling" the previous requests while allowing a
+//! re-enqueueing later on.
+//!
+//! Note that all time units in this file are in *milliseconds*.
+
+use std::comm::Data;
+use std::hashmap::HashMap;
+use std::libc;
+use std::os;
+use std::ptr;
+use std::rt::rtio;
+use std::sync::atomics;
+use std::unstable::intrinsics;
+
+use io::file::FileDesc;
+use io::IoResult;
+use io::timer_helper;
+
+pub struct Timer {
+    priv id: uint,
+    priv inner: Option<~Inner>,
+}
+
+struct Inner {
+    chan: Option<Chan<()>>,
+    interval: u64,
+    repeat: bool,
+    target: u64,
+    id: uint,
+}
+
+pub enum Req {
+    // Add a new timer to the helper thread.
+    NewTimer(~Inner),
+
+    // Remove a timer based on its id and then send it back on the channel
+    // provided
+    RemoveTimer(uint, Chan<~Inner>),
+
+    // Shut down the loop and then ACK this channel once it's shut down
+    Shutdown,
+}
+
+// returns the current time (in milliseconds)
+fn now() -> u64 {
+    unsafe {
+        let mut now: libc::timeval = intrinsics::init();
+        assert_eq!(imp::gettimeofday(&mut now, ptr::null()), 0);
+        return (now.tv_sec as u64) * 1000 + (now.tv_usec as u64) / 1000;
+    }
+}
+
+fn helper(input: libc::c_int, messages: Port<Req>) {
+    let mut set: imp::fd_set = unsafe { intrinsics::init() };
+
+    let mut fd = FileDesc::new(input, true);
+    let mut timeout: libc::timeval = unsafe { intrinsics::init() };
+
+    // active timers are those which are able to be selected upon (and it's a
+    // sorted list, and dead timers are those which have expired, but ownership
+    // hasn't yet been transferred back to the timer itself.
+    let mut active: ~[~Inner] = ~[];
+    let mut dead = HashMap::new();
+
+    // inserts a timer into an array of timers (sorted by firing time)
+    fn insert(t: ~Inner, active: &mut ~[~Inner]) {
+        match active.iter().position(|tm| tm.target > t.target) {
+            Some(pos) => { active.insert(pos, t); }
+            None => { active.push(t); }
+        }
+    }
+
+    // signals the first requests in the queue, possible re-enqueueing it.
+    fn signal(active: &mut ~[~Inner], dead: &mut HashMap<uint, ~Inner>) {
+        let mut timer = match active.shift() {
+            Some(timer) => timer, None => return
+        };
+        let chan = timer.chan.take_unwrap();
+        if chan.try_send(()) && timer.repeat {
+            timer.chan = Some(chan);
+            timer.target += timer.interval;
+            insert(timer, active);
+        } else {
+            drop(chan);
+            dead.insert(timer.id, timer);
+        }
+    }
+
+    'outer: loop {
+        let timeout = match active {
+            // Empty array? no timeout (wait forever for the next request)
+            [] => ptr::null(),
+
+            [~Inner { target, .. }, ..] => {
+                let now = now();
+                // If this request has already expired, then signal it and go
+                // through another iteration
+                if target <= now {
+                    signal(&mut active, &mut dead);
+                    continue;
+                }
+
+                // The actual timeout listed in the requests array is an
+                // absolute date, so here we translate the absolute time to a
+                // relative time.
+                let tm = target - now;
+                timeout.tv_sec = (tm / 1000) as libc::time_t;
+                timeout.tv_usec = ((tm % 1000) * 1000) as libc::suseconds_t;
+                &timeout as *libc::timeval
+            }
+        };
+
+        imp::fd_set(&mut set, input);
+        match unsafe {
+            imp::select(input + 1, &set, ptr::null(), ptr::null(), timeout)
+        } {
+            // timed out
+            0 => signal(&mut active, &mut dead),
+
+            // file descriptor write woke us up, we've got some new requests
+            1 => {
+                loop {
+                    match messages.try_recv() {
+                        Data(Shutdown) => {
+                            assert!(active.len() == 0);
+                            break 'outer;
+                        }
+
+                        Data(NewTimer(timer)) => insert(timer, &mut active),
+
+                        Data(RemoveTimer(id, ack)) => {
+                            match dead.pop(&id) {
+                                Some(i) => { ack.send(i); continue }
+                                None => {}
+                            }
+                            let i = active.iter().position(|i| i.id == id);
+                            let i = i.expect("no timer found");
+                            let t = active.remove(i).unwrap();
+                            ack.send(t);
+                        }
+                        _ => break
+                    }
+                }
+
+                // drain the file descriptor
+                let mut buf = [0];
+                fd.inner_read(buf);
+            }
+
+            -1 if os::errno() == libc::EINTR as int => {}
+            n => fail!("helper thread failed in select() with error: {} ({})",
+                       n, os::last_os_error())
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+
+        static mut ID: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
+        let id = unsafe { ID.fetch_add(1, atomics::Relaxed) };
+        Ok(Timer {
+            id: id,
+            inner: Some(~Inner {
+                chan: None,
+                interval: 0,
+                target: 0,
+                repeat: false,
+                id: id,
+            })
+        })
+    }
+
+    pub fn sleep(ms: u64) {
+        unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
+    }
+
+    fn inner(&mut self) -> ~Inner {
+        match self.inner.take() {
+            Some(i) => i,
+            None => {
+                let (p, c) = Chan::new();
+                timer_helper::send(RemoveTimer(self.id, c));
+                p.recv()
+            }
+        }
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        let mut inner = self.inner();
+        inner.chan = None; // cancel any previous request
+        self.inner = Some(inner);
+
+        Timer::sleep(msecs);
+    }
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        let now = now();
+        let mut inner = self.inner();
+
+        let (p, c) = Chan::new();
+        inner.repeat = false;
+        inner.chan = Some(c);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        timer_helper::send(NewTimer(inner));
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        let now = now();
+        let mut inner = self.inner();
+
+        let (p, c) = Chan::new();
+        inner.repeat = true;
+        inner.chan = Some(c);
+        inner.interval = msecs;
+        inner.target = now + msecs;
+
+        timer_helper::send(NewTimer(inner));
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.inner = Some(self.inner());
+    }
+}
+
+#[cfg(target_os = "macos")]
+mod imp {
+    use std::libc;
+
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [i32, ..(FD_SETSIZE / 32)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        set.fds_bits[fd / 32] |= 1 << (fd % 32);
+    }
+
+    extern {
+        pub fn select(nfds: libc::c_int,
+                      readfds: *fd_set,
+                      writefds: *fd_set,
+                      errorfds: *fd_set,
+                      timeout: *libc::timeval) -> libc::c_int;
+
+        pub fn gettimeofday(timeval: *mut libc::timeval,
+                            tzp: *libc::c_void) -> libc::c_int;
+    }
+}
+
+#[cfg(target_os = "freebsd")]
+mod imp {
+    use std::libc;
+
+    pub static FD_SETSIZE: uint = 1024;
+
+    pub struct fd_set {
+        fds_bits: [u64, ..(FD_SETSIZE / 64)]
+    }
+
+    pub fn fd_set(set: &mut fd_set, fd: i32) {
+        set.fds_bits[fd / 64] |= (1 << (fd % 64)) as u64;
+    }
+
+    extern {
+        pub fn select(nfds: libc::c_int,
+                      readfds: *fd_set,
+                      writefds: *fd_set,
+                      errorfds: *fd_set,
+                      timeout: *libc::timeval) -> libc::c_int;
+
+        pub fn gettimeofday(timeval: *mut libc::timeval,
+                            tzp: *libc::c_void) -> libc::c_int;
+    }
+}
diff --git a/src/libnative/io/timer_timerfd.rs b/src/libnative/io/timer_timerfd.rs
new file mode 100644
index 00000000000..4912f4f431f
--- /dev/null
+++ b/src/libnative/io/timer_timerfd.rs
@@ -0,0 +1,303 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers based on timerfd_create(2)
+//!
+//! On OSes which support timerfd_create, we can use these much more accurate
+//! timers over select() + a timeout (see timer_other.rs). This strategy still
+//! employs a worker thread which does the waiting on the timer fds (to send
+//! messages away).
+//!
+//! The worker thread in this implementation uses epoll(7) to block. It
+//! maintains a working set of *all* native timers in the process, along with a
+//! pipe file descriptor used to communicate that there is data available on the
+//! incoming channel to the worker thread. Timers send requests to update their
+//! timerfd settings to the worker thread (see the comment above 'oneshot' for
+//! why).
+//!
+//! As with timer_other, timers just using sleep() do not use the timerfd at
+//! all. They remove the timerfd from the worker thread and then invoke usleep()
+//! to block the calling thread.
+//!
+//! As with timer_other, all units in this file are in units of millseconds.
+
+use std::comm::Data;
+use std::libc;
+use std::ptr;
+use std::os;
+use std::rt::rtio;
+use std::hashmap::HashMap;
+use std::unstable::intrinsics;
+
+use io::file::FileDesc;
+use io::IoResult;
+use io::timer_helper;
+
+pub struct Timer {
+    priv fd: FileDesc,
+    priv on_worker: bool,
+}
+
+pub enum Req {
+    NewTimer(libc::c_int, Chan<()>, bool, imp::itimerspec),
+    RemoveTimer(libc::c_int, Chan<()>),
+    Shutdown,
+}
+
+fn helper(input: libc::c_int, messages: Port<Req>) {
+    let efd = unsafe { imp::epoll_create(10) };
+    let _fd1 = FileDesc::new(input, true);
+    let _fd2 = FileDesc::new(efd, true);
+
+    fn add(efd: libc::c_int, fd: libc::c_int) {
+        let event = imp::epoll_event {
+            events: imp::EPOLLIN as u32,
+            data: imp::epoll_data_t { fd: fd, pad: 0, }
+        };
+        let ret = unsafe {
+            imp::epoll_ctl(efd, imp::EPOLL_CTL_ADD, fd, &event)
+        };
+        assert_eq!(ret, 0);
+    }
+    fn del(efd: libc::c_int, fd: libc::c_int) {
+        let event = imp::epoll_event {
+            events: 0, data: imp::epoll_data_t { fd: 0, pad: 0, }
+        };
+        let ret = unsafe {
+            imp::epoll_ctl(efd, imp::EPOLL_CTL_DEL, fd, &event)
+        };
+        assert_eq!(ret, 0);
+    }
+
+    add(efd, input);
+    let events: [imp::epoll_event, ..16] = unsafe { intrinsics::init() };
+    let mut map: HashMap<libc::c_int, (Chan<()>, bool)> = HashMap::new();
+    'outer: loop {
+        let n = match unsafe {
+            imp::epoll_wait(efd, events.as_ptr(),
+                            events.len() as libc::c_int, -1)
+        } {
+            0 => fail!("epoll_wait returned immediately!"),
+            -1 => fail!("epoll wait failed: {}", os::last_os_error()),
+            n => n
+        };
+
+        let mut incoming = false;
+        debug!("{} events to process", n);
+        for event in events.slice_to(n as uint).iter() {
+            let fd = event.data.fd;
+            debug!("data on fd {} (input = {})", fd, input);
+            if fd == input {
+                let mut buf = [0, ..1];
+                // drain the input file descriptor of its input
+                FileDesc::new(fd, false).inner_read(buf);
+                incoming = true;
+            } else {
+                let mut bits = [0, ..8];
+                // drain the timerfd of how many times its fired
+                //
+                // XXX: should this perform a send() this number of
+                //      times?
+                FileDesc::new(fd, false).inner_read(bits);
+                let remove = {
+                    match map.find(&fd).expect("fd unregistered") {
+                        &(ref c, oneshot) => !c.try_send(()) || oneshot
+                    }
+                };
+                if remove {
+                    map.remove(&fd);
+                    del(efd, fd);
+                }
+            }
+        }
+
+        while incoming {
+            match messages.try_recv() {
+                Data(NewTimer(fd, chan, one, timeval)) => {
+                    // acknowledge we have the new channel, we will never send
+                    // another message to the old channel
+                    chan.send(());
+
+                    // If we haven't previously seen the file descriptor, then
+                    // we need to add it to the epoll set.
+                    if map.insert(fd, (chan, one)) {
+                        add(efd, fd);
+                    }
+
+                    // Update the timerfd's time value now that we have control
+                    // of the timerfd
+                    let ret = unsafe {
+                        imp::timerfd_settime(fd, 0, &timeval, ptr::null())
+                    };
+                    assert_eq!(ret, 0);
+                }
+
+                Data(RemoveTimer(fd, chan)) => {
+                    if map.remove(&fd) {
+                        del(efd, fd);
+                    }
+                    chan.send(());
+                }
+
+                Data(Shutdown) => {
+                    assert!(map.len() == 0);
+                    break 'outer;
+                }
+
+                _ => break,
+            }
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+        match unsafe { imp::timerfd_create(imp::CLOCK_MONOTONIC, 0) } {
+            -1 => Err(super::last_error()),
+            n => Ok(Timer { fd: FileDesc::new(n, true), on_worker: false, }),
+        }
+    }
+
+    pub fn sleep(ms: u64) {
+        unsafe { libc::usleep((ms * 1000) as libc::c_uint); }
+    }
+
+    fn remove(&mut self) {
+        if !self.on_worker { return }
+
+        let (p, c) = Chan::new();
+        timer_helper::send(RemoveTimer(self.fd.fd(), c));
+        p.recv();
+        self.on_worker = false;
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        self.remove();
+        Timer::sleep(msecs);
+    }
+
+    // Periodic and oneshot channels are updated by updating the settings on the
+    // corresopnding timerfd. The update is not performed on the thread calling
+    // oneshot or period, but rather the helper epoll thread. The reason for
+    // this is to avoid losing messages and avoid leaking messages across ports.
+    //
+    // By updating the timerfd on the helper thread, we're guaranteed that all
+    // messages for a particular setting of the timer will be received by the
+    // new channel/port pair rather than leaking old messages onto the new port
+    // or leaking new messages onto the old port.
+    //
+    // We also wait for the remote thread to actually receive the new settings
+    // before returning to guarantee the invariant that when oneshot() and
+    // period() return that the old port will never receive any more messages.
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        let (p, c) = Chan::new();
+
+        let new_value = imp::itimerspec {
+            it_interval: imp::timespec { tv_sec: 0, tv_nsec: 0 },
+            it_value: imp::timespec {
+                tv_sec: (msecs / 1000) as libc::time_t,
+                tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
+            }
+        };
+        timer_helper::send(NewTimer(self.fd.fd(), c, true, new_value));
+        p.recv();
+        self.on_worker = true;
+
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        let (p, c) = Chan::new();
+
+        let spec = imp::timespec {
+            tv_sec: (msecs / 1000) as libc::time_t,
+            tv_nsec: ((msecs % 1000) * 1000000) as libc::c_long,
+        };
+        let new_value = imp::itimerspec { it_interval: spec, it_value: spec, };
+        timer_helper::send(NewTimer(self.fd.fd(), c, false, new_value));
+        p.recv();
+        self.on_worker = true;
+
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        // When the timerfd file descriptor is closed, it will be automatically
+        // removed from the epoll set of the worker thread, but we want to make
+        // sure that the associated channel is also removed from the worker's
+        // hash map.
+        self.remove();
+    }
+}
+
+#[allow(dead_code)]
+mod imp {
+    use std::libc;
+
+    pub static CLOCK_MONOTONIC: libc::c_int = 1;
+    pub static EPOLL_CTL_ADD: libc::c_int = 1;
+    pub static EPOLL_CTL_DEL: libc::c_int = 2;
+    pub static EPOLL_CTL_MOD: libc::c_int = 3;
+    pub static EPOLLIN: libc::c_int = 0x001;
+    pub static EPOLLOUT: libc::c_int = 0x004;
+    pub static EPOLLPRI: libc::c_int = 0x002;
+    pub static EPOLLERR: libc::c_int = 0x008;
+    pub static EPOLLRDHUP: libc::c_int = 0x2000;
+    pub static EPOLLET: libc::c_int = 1 << 31;
+    pub static EPOLLHUP: libc::c_int = 0x010;
+    pub static EPOLLONESHOT: libc::c_int = 1 << 30;
+
+    pub struct epoll_event {
+        events: u32,
+        data: epoll_data_t,
+    }
+
+    pub struct epoll_data_t {
+        fd: i32,
+        pad: u32,
+    }
+
+    pub struct timespec {
+        tv_sec: libc::time_t,
+        tv_nsec: libc::c_long,
+    }
+
+    pub struct itimerspec {
+        it_interval: timespec,
+        it_value: timespec,
+    }
+
+    extern {
+        pub fn timerfd_create(clockid: libc::c_int,
+                              flags: libc::c_int) -> libc::c_int;
+        pub fn timerfd_settime(fd: libc::c_int,
+                               flags: libc::c_int,
+                               new_value: *itimerspec,
+                               old_value: *itimerspec) -> libc::c_int;
+        pub fn timerfd_gettime(fd: libc::c_int,
+                               curr_value: *itimerspec) -> libc::c_int;
+
+        pub fn epoll_create(size: libc::c_int) -> libc::c_int;
+        pub fn epoll_ctl(epfd: libc::c_int,
+                         op: libc::c_int,
+                         fd: libc::c_int,
+                         event: *epoll_event) -> libc::c_int;
+        pub fn epoll_wait(epfd: libc::c_int,
+                          events: *epoll_event,
+                          maxevents: libc::c_int,
+                          timeout: libc::c_int) -> libc::c_int;
+    }
+}
diff --git a/src/libnative/io/timer_win32.rs b/src/libnative/io/timer_win32.rs
new file mode 100644
index 00000000000..e359d99eedf
--- /dev/null
+++ b/src/libnative/io/timer_win32.rs
@@ -0,0 +1,203 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Timers based on win32 WaitableTimers
+//!
+//! This implementation is meant to be used solely on windows. As with other
+//! implementations, there is a worker thread which is doing all the waiting on
+//! a large number of timers for all active timers in the system. This worker
+//! thread uses the select() equivalent, WaitForMultipleObjects. One of the
+//! objects being waited on is a signal into the worker thread to notify that
+//! the incoming channel should be looked at.
+//!
+//! Other than that, the implementation is pretty straightforward in terms of
+//! the other two implementations of timers with nothing *that* new showing up.
+
+use std::comm::Data;
+use std::libc;
+use std::ptr;
+use std::rt::rtio;
+
+use io::timer_helper;
+use io::IoResult;
+
+pub struct Timer {
+    priv obj: libc::HANDLE,
+    priv on_worker: bool,
+}
+
+pub enum Req {
+    NewTimer(libc::HANDLE, Chan<()>, bool),
+    RemoveTimer(libc::HANDLE, Chan<()>),
+    Shutdown,
+}
+
+fn helper(input: libc::HANDLE, messages: Port<Req>) {
+    let mut objs = ~[input];
+    let mut chans = ~[];
+
+    'outer: loop {
+        let idx = unsafe {
+            imp::WaitForMultipleObjects(objs.len() as libc::DWORD,
+                                        objs.as_ptr(),
+                                        0 as libc::BOOL,
+                                        libc::INFINITE)
+        };
+
+        if idx == 0 {
+            loop {
+                match messages.try_recv() {
+                    Data(NewTimer(obj, c, one)) => {
+                        objs.push(obj);
+                        chans.push((c, one));
+                    }
+                    Data(RemoveTimer(obj, c)) => {
+                        c.send(());
+                        match objs.iter().position(|&o| o == obj) {
+                            Some(i) => {
+                                objs.remove(i);
+                                chans.remove(i - 1);
+                            }
+                            None => {}
+                        }
+                    }
+                    Data(Shutdown) => {
+                        assert_eq!(objs.len(), 1);
+                        assert_eq!(chans.len(), 0);
+                        break 'outer;
+                    }
+                    _ => break
+                }
+            }
+        } else {
+            let remove = {
+                match &chans[idx - 1] {
+                    &(ref c, oneshot) => !c.try_send(()) || oneshot
+                }
+            };
+            if remove {
+                objs.remove(idx as uint);
+                chans.remove(idx as uint - 1);
+            }
+        }
+    }
+}
+
+impl Timer {
+    pub fn new() -> IoResult<Timer> {
+        timer_helper::boot(helper);
+
+        let obj = unsafe {
+            imp::CreateWaitableTimerA(ptr::mut_null(), 0, ptr::null())
+        };
+        if obj.is_null() {
+            Err(super::last_error())
+        } else {
+            Ok(Timer { obj: obj, on_worker: false, })
+        }
+    }
+
+    pub fn sleep(ms: u64) {
+        use std::rt::rtio::RtioTimer;
+        let mut t = Timer::new().ok().expect("must allocate a timer!");
+        t.sleep(ms);
+    }
+
+    fn remove(&mut self) {
+        if !self.on_worker { return }
+
+        let (p, c) = Chan::new();
+        timer_helper::send(RemoveTimer(self.obj, c));
+        p.recv();
+
+        self.on_worker = false;
+    }
+}
+
+impl rtio::RtioTimer for Timer {
+    fn sleep(&mut self, msecs: u64) {
+        self.remove();
+
+        // there are 10^6 nanoseconds in a millisecond, and the parameter is in
+        // 100ns intervals, so we multiply by 10^4.
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
+                                  ptr::mut_null(), 0)
+        }, 1);
+
+        unsafe { imp::WaitForSingleObject(self.obj, libc::INFINITE); }
+    }
+
+    fn oneshot(&mut self, msecs: u64) -> Port<()> {
+        self.remove();
+        let (p, c) = Chan::new();
+
+        // see above for the calculation
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, 0, ptr::null(),
+                                  ptr::mut_null(), 0)
+        }, 1);
+
+        timer_helper::send(NewTimer(self.obj, c, true));
+        self.on_worker = true;
+        return p;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        self.remove();
+        let (p, c) = Chan::new();
+
+        // see above for the calculation
+        let due = -(msecs * 10000) as libc::LARGE_INTEGER;
+        assert_eq!(unsafe {
+            imp::SetWaitableTimer(self.obj, &due, msecs as libc::LONG,
+                                  ptr::null(), ptr::mut_null(), 0)
+        }, 1);
+
+        timer_helper::send(NewTimer(self.obj, c, false));
+        self.on_worker = true;
+
+        return p;
+    }
+}
+
+impl Drop for Timer {
+    fn drop(&mut self) {
+        self.remove();
+        unsafe { libc::CloseHandle(self.obj); }
+    }
+}
+
+mod imp {
+    use std::libc::{LPSECURITY_ATTRIBUTES, BOOL, LPCSTR, HANDLE, LARGE_INTEGER,
+                    LONG, LPVOID, DWORD, c_void};
+
+    pub type PTIMERAPCROUTINE = *c_void;
+
+    extern "system" {
+        pub fn CreateWaitableTimerA(lpTimerAttributes: LPSECURITY_ATTRIBUTES,
+                                    bManualReset: BOOL,
+                                    lpTimerName: LPCSTR) -> HANDLE;
+        pub fn SetWaitableTimer(hTimer: HANDLE,
+                                pDueTime: *LARGE_INTEGER,
+                                lPeriod: LONG,
+                                pfnCompletionRoutine: PTIMERAPCROUTINE,
+                                lpArgToCompletionRoutine: LPVOID,
+                                fResume: BOOL) -> BOOL;
+        pub fn WaitForMultipleObjects(nCount: DWORD,
+                                      lpHandles: *HANDLE,
+                                      bWaitAll: BOOL,
+                                      dwMilliseconds: DWORD) -> DWORD;
+        pub fn WaitForSingleObject(hHandle: HANDLE,
+                                   dwMilliseconds: DWORD) -> DWORD;
+    }
+}