about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2013-08-15 14:18:13 -0700
committerAlex Crichton <alex@alexcrichton.com>2013-08-27 20:46:43 -0700
commitb89e1c000e133fb5db3ea5afd0948db6dc088977 (patch)
tree6bd105ae723233519d985e61bdef09f4b0e13603 /src/libstd/rt
parented204257a0c6abc8386879bb631471ec17d8a96a (diff)
downloadrust-b89e1c000e133fb5db3ea5afd0948db6dc088977.tar.gz
rust-b89e1c000e133fb5db3ea5afd0948db6dc088977.zip
Implement process bindings to libuv
Closes #6436
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/io/file.rs3
-rw-r--r--src/libstd/rt/io/mod.rs3
-rw-r--r--src/libstd/rt/io/net/tcp.rs6
-rw-r--r--src/libstd/rt/io/pipe.rs77
-rw-r--r--src/libstd/rt/rtio.rs22
-rw-r--r--src/libstd/rt/uv/async.rs2
-rw-r--r--src/libstd/rt/uv/file.rs25
-rw-r--r--src/libstd/rt/uv/idle.rs4
-rw-r--r--src/libstd/rt/uv/mod.rs18
-rw-r--r--src/libstd/rt/uv/net.rs12
-rw-r--r--src/libstd/rt/uv/pipe.rs66
-rw-r--r--src/libstd/rt/uv/process.rs264
-rw-r--r--src/libstd/rt/uv/timer.rs2
-rw-r--r--src/libstd/rt/uv/uvio.rs294
-rw-r--r--src/libstd/rt/uv/uvll.rs94
15 files changed, 803 insertions, 89 deletions
diff --git a/src/libstd/rt/io/file.rs b/src/libstd/rt/io/file.rs
index f4e9c4d7c11..534e308a1a6 100644
--- a/src/libstd/rt/io/file.rs
+++ b/src/libstd/rt/io/file.rs
@@ -71,9 +71,6 @@ pub struct FileStream {
     last_nread: int,
 }
 
-impl FileStream {
-}
-
 impl Reader for FileStream {
     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
         match self.fd.read(buf) {
diff --git a/src/libstd/rt/io/mod.rs b/src/libstd/rt/io/mod.rs
index 116d240308a..038fca9a1ad 100644
--- a/src/libstd/rt/io/mod.rs
+++ b/src/libstd/rt/io/mod.rs
@@ -268,6 +268,9 @@ pub use self::extensions::WriterByteConversions;
 /// Synchronous, non-blocking file I/O.
 pub mod file;
 
+/// Synchronous, in-memory I/O.
+pub mod pipe;
+
 /// Synchronous, non-blocking network I/O.
 pub mod net {
     pub mod tcp;
diff --git a/src/libstd/rt/io/net/tcp.rs b/src/libstd/rt/io/net/tcp.rs
index 85fb5d08848..dc7135f4a61 100644
--- a/src/libstd/rt/io/net/tcp.rs
+++ b/src/libstd/rt/io/net/tcp.rs
@@ -16,7 +16,7 @@ use rt::io::{io_error, read_error, EndOfFile};
 use rt::rtio::{IoFactory, IoFactoryObject,
                RtioSocket, RtioTcpListener,
                RtioTcpListenerObject, RtioTcpStream,
-               RtioTcpStreamObject};
+               RtioTcpStreamObject, RtioStream};
 use rt::local::Local;
 
 pub struct TcpStream(~RtioTcpStreamObject);
@@ -69,7 +69,7 @@ impl TcpStream {
 
 impl Reader for TcpStream {
     fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
-        match (**self).read(buf) {
+        match (***self).read(buf) {
             Ok(read) => Some(read),
             Err(ioerr) => {
                 // EOF is indicated by returning None
@@ -86,7 +86,7 @@ impl Reader for TcpStream {
 
 impl Writer for TcpStream {
     fn write(&mut self, buf: &[u8]) {
-        match (**self).write(buf) {
+        match (***self).write(buf) {
             Ok(_) => (),
             Err(ioerr) => io_error::cond.raise(ioerr),
         }
diff --git a/src/libstd/rt/io/pipe.rs b/src/libstd/rt/io/pipe.rs
new file mode 100644
index 00000000000..02b3d0fe57d
--- /dev/null
+++ b/src/libstd/rt/io/pipe.rs
@@ -0,0 +1,77 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Synchronous, in-memory pipes.
+//!
+//! Currently these aren't particularly useful, there only exists bindings
+//! enough so that pipes can be created to child processes.
+
+use prelude::*;
+use super::{Reader, Writer};
+use rt::io::{io_error, read_error, EndOfFile};
+use rt::local::Local;
+use rt::rtio::{RtioPipeObject, RtioStream, IoFactoryObject, IoFactory};
+use rt::uv::pipe;
+
+pub struct PipeStream(~RtioPipeObject);
+
+impl PipeStream {
+    /// Creates a new pipe initialized, but not bound to any particular
+    /// source/destination
+    pub fn new() -> Option<PipeStream> {
+        let pipe = unsafe {
+            let io: *mut IoFactoryObject = Local::unsafe_borrow();
+            (*io).pipe_init(false)
+        };
+        match pipe {
+            Ok(p) => Some(PipeStream(p)),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                None
+            }
+        }
+    }
+
+    /// Extracts the underlying libuv pipe to be bound to another source.
+    pub fn uv_pipe(&self) -> pipe::Pipe {
+        // Did someone say multiple layers of indirection?
+        (**self).uv_pipe()
+    }
+}
+
+impl Reader for PipeStream {
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        match (***self).read(buf) {
+            Ok(read) => Some(read),
+            Err(ioerr) => {
+                // EOF is indicated by returning None
+                if ioerr.kind != EndOfFile {
+                    read_error::cond.raise(ioerr);
+                }
+                return None;
+            }
+        }
+    }
+
+    fn eof(&mut self) -> bool { fail!() }
+}
+
+impl Writer for PipeStream {
+    fn write(&mut self, buf: &[u8]) {
+        match (***self).write(buf) {
+            Ok(_) => (),
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+            }
+        }
+    }
+
+    fn flush(&mut self) { fail!() }
+}
diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs
index 1788b7a04e3..1a7ef6ea309 100644
--- a/src/libstd/rt/rtio.rs
+++ b/src/libstd/rt/rtio.rs
@@ -8,12 +8,14 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use libc;
 use option::*;
 use result::*;
 use libc::c_int;
 
 use rt::io::IoError;
 use super::io::net::ip::{IpAddr, SocketAddr};
+use rt::uv;
 use rt::uv::uvio;
 use path::Path;
 use super::io::support::PathLike;
@@ -30,6 +32,9 @@ pub type RtioTcpListenerObject = uvio::UvTcpListener;
 pub type RtioUdpSocketObject = uvio::UvUdpSocket;
 pub type RtioTimerObject = uvio::UvTimer;
 pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
+pub type RtioPipeObject = uvio::UvPipeStream;
+pub type RtioProcessObject = uvio::UvProcess;
+pub type RtioProcessConfig<'self> = uv::process::Config<'self>;
 
 pub trait EventLoop {
     fn run(&mut self);
@@ -72,6 +77,13 @@ pub trait IoFactory {
     fn fs_open<P: PathLike>(&mut self, path: &P, fm: FileMode, fa: FileAccess)
         -> Result<~RtioFileStream, IoError>;
     fn fs_unlink<P: PathLike>(&mut self, path: &P) -> Result<(), IoError>;
+    fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError>;
+    fn spawn(&mut self, config: &RtioProcessConfig) -> Result<~RtioProcessObject, IoError>;
+}
+
+pub trait RtioStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
 }
 
 pub trait RtioTcpListener : RtioSocket {
@@ -80,9 +92,7 @@ pub trait RtioTcpListener : RtioSocket {
     fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
 }
 
-pub trait RtioTcpStream : RtioSocket {
-    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
-    fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
+pub trait RtioTcpStream : RtioSocket + RtioStream {
     fn peer_name(&mut self) -> Result<SocketAddr, IoError>;
     fn control_congestion(&mut self) -> Result<(), IoError>;
     fn nodelay(&mut self) -> Result<(), IoError>;
@@ -124,3 +134,9 @@ pub trait RtioFileStream {
     fn tell(&self) -> Result<u64, IoError>;
     fn flush(&mut self) -> Result<(), IoError>;
 }
+
+pub trait RtioProcess {
+    fn id(&self) -> libc::pid_t;
+    fn kill(&mut self, signal: int) -> Result<(), IoError>;
+    fn wait(&mut self) -> int;
+}
diff --git a/src/libstd/rt/uv/async.rs b/src/libstd/rt/uv/async.rs
index d0ca38317cb..ff7bb9dd03a 100644
--- a/src/libstd/rt/uv/async.rs
+++ b/src/libstd/rt/uv/async.rs
@@ -34,7 +34,7 @@ impl AsyncWatcher {
 
         extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
             let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
-            let status = status_to_maybe_uv_error(watcher, status);
+            let status = status_to_maybe_uv_error(status);
             let data = watcher.get_watcher_data();
             let cb = data.async_cb.get_ref();
             (*cb)(watcher, status);
diff --git a/src/libstd/rt/uv/file.rs b/src/libstd/rt/uv/file.rs
index 405dfe0a7f0..5c77181d7eb 100644
--- a/src/libstd/rt/uv/file.rs
+++ b/src/libstd/rt/uv/file.rs
@@ -11,8 +11,8 @@
 use prelude::*;
 use ptr::null;
 use libc::c_void;
-use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf,
-             status_to_maybe_uv_error_with_loop, UvError};
+use rt::uv::{Request, NativeHandle, Loop, FsCallback, Buf, UvError};
+use rt::uv::status_to_maybe_uv_error;
 use rt::uv::uvll;
 use rt::uv::uvll::*;
 use super::super::io::support::PathLike;
@@ -62,7 +62,7 @@ impl FsRequest {
     pub fn open_sync<P: PathLike>(loop_: &Loop, path: &P, flags: int, mode: int)
           -> Result<int, UvError> {
         let result = FsRequest::open_common(loop_, path, flags, mode, None);
-        sync_cleanup(loop_, result)
+        sync_cleanup(result)
     }
 
     fn unlink_common<P: PathLike>(loop_: &Loop, path: &P, cb: Option<FsCallback>) -> int {
@@ -83,11 +83,11 @@ impl FsRequest {
     }
     pub fn unlink<P: PathLike>(loop_: &Loop, path: &P, cb: FsCallback) {
         let result = FsRequest::unlink_common(loop_, path, Some(cb));
-        sync_cleanup(loop_, result);
+        sync_cleanup(result);
     }
     pub fn unlink_sync<P: PathLike>(loop_: &Loop, path: &P) -> Result<int, UvError> {
         let result = FsRequest::unlink_common(loop_, path, None);
-        sync_cleanup(loop_, result)
+        sync_cleanup(result)
     }
 
     pub fn install_req_data(&self, cb: Option<FsCallback>) {
@@ -139,9 +139,8 @@ impl NativeHandle<*uvll::uv_fs_t> for FsRequest {
         match self { &FsRequest(ptr) => ptr }
     }
 }
-    fn sync_cleanup(loop_: &Loop, result: int)
-          -> Result<int, UvError> {
-        match status_to_maybe_uv_error_with_loop(loop_.native_handle(), result as i32) {
+    fn sync_cleanup(result: int) -> Result<int, UvError> {
+        match status_to_maybe_uv_error(result as i32) {
             Some(err) => Err(err),
             None => Ok(result)
         }
@@ -184,7 +183,7 @@ impl FileDescriptor {
     pub fn write_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
           -> Result<int, UvError> {
         let result = self.write_common(loop_, buf, offset, None);
-        sync_cleanup(loop_, result)
+        sync_cleanup(result)
     }
 
     fn read_common(&mut self, loop_: &Loop, buf: Buf,
@@ -212,7 +211,7 @@ impl FileDescriptor {
     pub fn read_sync(&mut self, loop_: &Loop, buf: Buf, offset: i64)
           -> Result<int, UvError> {
         let result = self.read_common(loop_, buf, offset, None);
-        sync_cleanup(loop_, result)
+        sync_cleanup(result)
     }
 
     fn close_common(self, loop_: &Loop, cb: Option<FsCallback>) -> int {
@@ -234,12 +233,11 @@ impl FileDescriptor {
     }
     pub fn close_sync(self, loop_: &Loop) -> Result<int, UvError> {
         let result = self.close_common(loop_, None);
-        sync_cleanup(loop_, result)
+        sync_cleanup(result)
     }
 }
 extern fn compl_cb(req: *uv_fs_t) {
     let mut req: FsRequest = NativeHandle::from_native_handle(req);
-    let loop_ = req.get_loop();
     // pull the user cb out of the req data
     let cb = {
         let data = req.get_req_data();
@@ -250,8 +248,7 @@ extern fn compl_cb(req: *uv_fs_t) {
     // in uv_fs_open calls, the result will be the fd in the
     // case of success, otherwise it's -1 indicating an error
     let result = req.get_result();
-    let status = status_to_maybe_uv_error_with_loop(
-        loop_.native_handle(), result);
+    let status = status_to_maybe_uv_error(result);
     // we have a req and status, call the user cb..
     // only giving the user a ref to the FsRequest, as we
     // have to clean it up, afterwards (and they aren't really
diff --git a/src/libstd/rt/uv/idle.rs b/src/libstd/rt/uv/idle.rs
index a21146620ca..8cbcd7b77c0 100644
--- a/src/libstd/rt/uv/idle.rs
+++ b/src/libstd/rt/uv/idle.rs
@@ -43,7 +43,7 @@ impl IdleWatcher {
             let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
             let data = idle_watcher.get_watcher_data();
             let cb: &IdleCallback = data.idle_cb.get_ref();
-            let status = status_to_maybe_uv_error(idle_watcher, status);
+            let status = status_to_maybe_uv_error(status);
             (*cb)(idle_watcher, status);
         }
     }
@@ -57,7 +57,7 @@ impl IdleWatcher {
             let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
             let data = idle_watcher.get_watcher_data();
             let cb: &IdleCallback = data.idle_cb.get_ref();
-            let status = status_to_maybe_uv_error(idle_watcher, status);
+            let status = status_to_maybe_uv_error(status);
             (*cb)(idle_watcher, status);
         }
     }
diff --git a/src/libstd/rt/uv/mod.rs b/src/libstd/rt/uv/mod.rs
index 5a592b92a38..700b80c7398 100644
--- a/src/libstd/rt/uv/mod.rs
+++ b/src/libstd/rt/uv/mod.rs
@@ -58,6 +58,8 @@ pub use self::net::{StreamWatcher, TcpWatcher, UdpWatcher};
 pub use self::idle::IdleWatcher;
 pub use self::timer::TimerWatcher;
 pub use self::async::AsyncWatcher;
+pub use self::process::Process;
+pub use self::pipe::Pipe;
 
 /// The implementation of `rtio` for libuv
 pub mod uvio;
@@ -70,6 +72,8 @@ pub mod net;
 pub mod idle;
 pub mod timer;
 pub mod async;
+pub mod process;
+pub mod pipe;
 
 /// XXX: Loop(*handle) is buggy with destructors. Normal structs
 /// with dtors may not be destructured, but tuple structs can,
@@ -126,6 +130,8 @@ pub type NullCallback = ~fn();
 pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
 pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
 pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
+// first int is exit_status, second is term_signal
+pub type ExitCallback = ~fn(Process, int, int, Option<UvError>);
 pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
 pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
 pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
@@ -143,7 +149,8 @@ struct WatcherData {
     timer_cb: Option<TimerCallback>,
     async_cb: Option<AsyncCallback>,
     udp_recv_cb: Option<UdpReceiveCallback>,
-    udp_send_cb: Option<UdpSendCallback>
+    udp_send_cb: Option<UdpSendCallback>,
+    exit_cb: Option<ExitCallback>,
 }
 
 pub trait WatcherInterop {
@@ -175,7 +182,8 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
                 timer_cb: None,
                 async_cb: None,
                 udp_recv_cb: None,
-                udp_send_cb: None
+                udp_send_cb: None,
+                exit_cb: None,
             };
             let data = transmute::<~WatcherData, *c_void>(data);
             uvll::set_data_for_uv_handle(self.native_handle(), data);
@@ -273,10 +281,8 @@ pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
     }
 }
 
-/// Given a uv handle, convert a callback status to a UvError
-pub fn status_to_maybe_uv_error<T, U: Watcher + NativeHandle<*T>>(
-    handle: U, status: c_int) -> Option<UvError>
-{
+/// Convert a callback status to a UvError
+pub fn status_to_maybe_uv_error(status: c_int) -> Option<UvError> {
     if status >= 0 {
         None
     } else {
diff --git a/src/libstd/rt/uv/net.rs b/src/libstd/rt/uv/net.rs
index bb3a00ca243..1581b017087 100644
--- a/src/libstd/rt/uv/net.rs
+++ b/src/libstd/rt/uv/net.rs
@@ -136,7 +136,7 @@ impl StreamWatcher {
             rtdebug!("buf len: %d", buf.len as int);
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(stream);
             let cb = stream_watcher.get_watcher_data().read_cb.get_ref();
-            let status = status_to_maybe_uv_error(stream_watcher, nread as c_int);
+            let status = status_to_maybe_uv_error(nread as c_int);
             (*cb)(stream_watcher, nread as int, buf, status);
         }
     }
@@ -166,7 +166,7 @@ impl StreamWatcher {
             let mut stream_watcher = write_request.stream();
             write_request.delete();
             let cb = stream_watcher.get_watcher_data().write_cb.take_unwrap();
-            let status = status_to_maybe_uv_error(stream_watcher, status);
+            let status = status_to_maybe_uv_error(status);
             cb(stream_watcher, status);
         }
     }
@@ -259,7 +259,7 @@ impl TcpWatcher {
                 let mut stream_watcher = connect_request.stream();
                 connect_request.delete();
                 let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
-                let status = status_to_maybe_uv_error(stream_watcher, status);
+                let status = status_to_maybe_uv_error(status);
                 cb(stream_watcher, status);
             }
         }
@@ -282,7 +282,7 @@ impl TcpWatcher {
             rtdebug!("connection_cb");
             let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
             let cb = stream_watcher.get_watcher_data().connect_cb.get_ref();
-            let status = status_to_maybe_uv_error(stream_watcher, status);
+            let status = status_to_maybe_uv_error(status);
             (*cb)(stream_watcher, status);
         }
     }
@@ -359,7 +359,7 @@ impl UdpWatcher {
             rtdebug!("buf len: %d", buf.len as int);
             let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
             let cb = udp_watcher.get_watcher_data().udp_recv_cb.get_ref();
-            let status = status_to_maybe_uv_error(udp_watcher, nread as c_int);
+            let status = status_to_maybe_uv_error(nread as c_int);
             let addr = uv_socket_addr_to_socket_addr(sockaddr_to_UvSocketAddr(addr));
             (*cb)(udp_watcher, nread as int, buf, addr, flags as uint, status);
         }
@@ -394,7 +394,7 @@ impl UdpWatcher {
             let mut udp_watcher = send_request.handle();
             send_request.delete();
             let cb = udp_watcher.get_watcher_data().udp_send_cb.take_unwrap();
-            let status = status_to_maybe_uv_error(udp_watcher, status);
+            let status = status_to_maybe_uv_error(status);
             cb(udp_watcher, status);
         }
     }
diff --git a/src/libstd/rt/uv/pipe.rs b/src/libstd/rt/uv/pipe.rs
new file mode 100644
index 00000000000..1147c731a60
--- /dev/null
+++ b/src/libstd/rt/uv/pipe.rs
@@ -0,0 +1,66 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use prelude::*;
+use libc;
+
+use rt::uv;
+use rt::uv::net;
+use rt::uv::uvll;
+
+pub struct Pipe(*uvll::uv_pipe_t);
+
+impl uv::Watcher for Pipe {}
+
+impl Pipe {
+    pub fn new(loop_: &uv::Loop, ipc: bool) -> Pipe {
+        unsafe {
+            let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
+            assert!(handle.is_not_null());
+            let ipc = ipc as libc::c_int;
+            assert_eq!(uvll::pipe_init(loop_.native_handle(), handle, ipc), 0);
+            let mut ret: Pipe =
+                    uv::NativeHandle::from_native_handle(handle);
+            ret.install_watcher_data();
+            ret
+        }
+    }
+
+    pub fn as_stream(&self) -> net::StreamWatcher {
+        net::StreamWatcher(**self as *uvll::uv_stream_t)
+    }
+
+    pub fn close(self, cb: uv::NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb); }
+
+        extern fn close_cb(handle: *uvll::uv_pipe_t) {
+            let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
+            process.get_watcher_data().close_cb.take_unwrap()();
+            process.drop_watcher_data();
+            unsafe { uvll::free_handle(handle as *libc::c_void) }
+        }
+    }
+}
+
+impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe {
+    fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
+        Pipe(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_pipe_t {
+        match self { &Pipe(ptr) => ptr }
+    }
+}
diff --git a/src/libstd/rt/uv/process.rs b/src/libstd/rt/uv/process.rs
new file mode 100644
index 00000000000..a02cf67ec26
--- /dev/null
+++ b/src/libstd/rt/uv/process.rs
@@ -0,0 +1,264 @@
+// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use prelude::*;
+use libc;
+use ptr;
+use vec;
+use cell::Cell;
+
+use rt::uv;
+use rt::uv::net;
+use rt::uv::pipe;
+use rt::uv::uvll;
+
+/// A process wraps the handle of the underlying uv_process_t.
+pub struct Process(*uvll::uv_process_t);
+
+/// This configuration describes how a new process should be spawned. This is
+/// translated to libuv's own configuration
+pub struct Config<'self> {
+    /// Path to the program to run
+    program: &'self str,
+
+    /// Arguments to pass to the program (doesn't include the program itself)
+    args: &'self [~str],
+
+    /// Optional environment to specify for the program. If this is None, then
+    /// it will inherit the current process's environment.
+    env: Option<&'self [(~str, ~str)]>,
+
+    /// Optional working directory for the new process. If this is None, then
+    /// the current directory of the running process is inherited.
+    cwd: Option<&'self str>,
+
+    /// Any number of streams/file descriptors/pipes may be attached to this
+    /// process. This list enumerates the file descriptors and such for the
+    /// process to be spawned, and the file descriptors inherited will start at
+    /// 0 and go to the length of this array.
+    ///
+    /// Standard file descriptors are:
+    ///
+    ///     0 - stdin
+    ///     1 - stdout
+    ///     2 - stderr
+    io: &'self [StdioContainer]
+}
+
+/// Describes what to do with a standard io stream for a child process.
+pub enum StdioContainer {
+    /// This stream will be ignored. This is the equivalent of attaching the
+    /// stream to `/dev/null`
+    Ignored,
+
+    /// The specified file descriptor is inherited for the stream which it is
+    /// specified for.
+    InheritFd(libc::c_int),
+
+    /// The specified libuv stream is inherited for the corresponding file
+    /// descriptor it is assigned to.
+    InheritStream(net::StreamWatcher),
+
+    /// Creates a pipe for the specified file descriptor which will be directed
+    /// into the previously-initialized pipe passed in.
+    ///
+    /// The first boolean argument is whether the pipe is readable, and the
+    /// second is whether it is writable. These properties are from the view of
+    /// the *child* process, not the parent process.
+    CreatePipe(pipe::Pipe, bool /* readable */, bool /* writable */),
+}
+
+impl uv::Watcher for Process {}
+
+impl Process {
+    /// Creates a new process, ready to spawn inside an event loop
+    pub fn new() -> Process {
+        let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) };
+        assert!(handle.is_not_null());
+        let mut ret: Process = uv::NativeHandle::from_native_handle(handle);
+        ret.install_watcher_data();
+        return ret;
+    }
+
+    /// Spawn a new process inside the specified event loop.
+    ///
+    /// The `config` variable will be passed down to libuv, and the `exit_cb`
+    /// will be run only once, when the process exits.
+    ///
+    /// Returns either the corresponding process object or an error which
+    /// occurred.
+    pub fn spawn(&mut self, loop_: &uv::Loop, config: &Config,
+                 exit_cb: uv::ExitCallback) -> Result<(), uv::UvError> {
+        let cwd = config.cwd.map_move(|s| s.to_c_str());
+
+        extern fn on_exit(p: *uvll::uv_process_t,
+                          exit_status: libc::c_int,
+                          term_signal: libc::c_int) {
+            let mut p: Process = uv::NativeHandle::from_native_handle(p);
+            let err = match exit_status {
+                0 => None,
+                _ => uv::status_to_maybe_uv_error(-1)
+            };
+            p.get_watcher_data().exit_cb.take_unwrap()(p,
+                                                       exit_status as int,
+                                                       term_signal as int,
+                                                       err);
+        }
+
+        let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(
+                                config.io.len());
+        unsafe {
+            vec::raw::set_len(&mut stdio, config.io.len());
+            for (slot, &other) in stdio.iter().zip(config.io.iter()) {
+                set_stdio(slot as *uvll::uv_stdio_container_t, other);
+            }
+        }
+
+        let exit_cb = Cell::new(exit_cb);
+        do with_argv(config.program, config.args) |argv| {
+            do with_env(config.env) |envp| {
+                let options = uvll::uv_process_options_t {
+                    exit_cb: on_exit,
+                    file: unsafe { *argv },
+                    args: argv,
+                    env: envp,
+                    cwd: match cwd {
+                        Some(ref cwd) => cwd.with_ref(|p| p),
+                        None => ptr::null(),
+                    },
+                    flags: 0,
+                    stdio_count: stdio.len() as libc::c_int,
+                    stdio: stdio.as_imm_buf(|p, _| p),
+                    uid: 0,
+                    gid: 0,
+                };
+
+                match unsafe {
+                    uvll::spawn(loop_.native_handle(), **self, options)
+                } {
+                    0 => {
+                        (*self).get_watcher_data().exit_cb = Some(exit_cb.take());
+                        Ok(())
+                    }
+                    err => Err(uv::UvError(err))
+                }
+            }
+        }
+    }
+
+    /// Sends a signal to this process.
+    ///
+    /// This is a wrapper around `uv_process_kill`
+    pub fn kill(&self, signum: int) -> Result<(), uv::UvError> {
+        match unsafe {
+            uvll::process_kill(self.native_handle(), signum as libc::c_int)
+        } {
+            0 => Ok(()),
+            err => Err(uv::UvError(err))
+        }
+    }
+
+    /// Returns the process id of a spawned process
+    pub fn pid(&self) -> libc::pid_t {
+        unsafe { uvll::process_pid(**self) as libc::pid_t }
+    }
+
+    /// Closes this handle, invoking the specified callback once closed
+    pub fn close(self, cb: uv::NullCallback) {
+        {
+            let mut this = self;
+            let data = this.get_watcher_data();
+            assert!(data.close_cb.is_none());
+            data.close_cb = Some(cb);
+        }
+
+        unsafe { uvll::close(self.native_handle(), close_cb); }
+
+        extern fn close_cb(handle: *uvll::uv_process_t) {
+            let mut process: Process = uv::NativeHandle::from_native_handle(handle);
+            process.get_watcher_data().close_cb.take_unwrap()();
+            process.drop_watcher_data();
+            unsafe { uvll::free_handle(handle as *libc::c_void) }
+        }
+    }
+}
+
+unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t, io: StdioContainer) {
+    match io {
+        Ignored => { uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE); }
+        InheritFd(fd) => {
+            uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_FD);
+            uvll::set_stdio_container_fd(dst, fd);
+        }
+        InheritStream(stream) => {
+            uvll::set_stdio_container_flags(dst, uvll::STDIO_INHERIT_STREAM);
+            uvll::set_stdio_container_stream(dst, stream.native_handle());
+        }
+        CreatePipe(pipe, readable, writable) => {
+            let mut flags = uvll::STDIO_CREATE_PIPE as libc::c_int;
+            if readable {
+                flags |= uvll::STDIO_READABLE_PIPE as libc::c_int;
+            }
+            if writable {
+                flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
+            }
+            uvll::set_stdio_container_flags(dst, flags);
+            uvll::set_stdio_container_stream(dst,
+                                             pipe.as_stream().native_handle());
+        }
+    }
+}
+
+/// Converts the program and arguments to the argv array expected by libuv
+fn with_argv<T>(prog: &str, args: &[~str], f: &fn(**libc::c_char) -> T) -> T {
+    // First, allocation space to put all the C-strings (we need to have
+    // ownership of them somewhere
+    let mut c_strs = vec::with_capacity(args.len() + 1);
+    c_strs.push(prog.to_c_str());
+    for arg in args.iter() {
+        c_strs.push(arg.to_c_str());
+    }
+
+    // Next, create the char** array
+    let mut c_args = vec::with_capacity(c_strs.len() + 1);
+    for s in c_strs.iter() {
+        c_args.push(s.with_ref(|p| p));
+    }
+    c_args.push(ptr::null());
+    c_args.as_imm_buf(|buf, _| f(buf))
+}
+
+/// Converts the environment to the env array expected by libuv
+fn with_env<T>(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T {
+    let env = match env {
+        Some(s) => s,
+        None => { return f(ptr::null()); }
+    };
+    // As with argv, create some temporary storage and then the actual array
+    let mut envp = vec::with_capacity(env.len());
+    for &(ref key, ref value) in env.iter() {
+        envp.push(fmt!("%s=%s", *key, *value).to_c_str());
+    }
+    let mut c_envp = vec::with_capacity(envp.len() + 1);
+    for s in envp.iter() {
+        c_envp.push(s.with_ref(|p| p));
+    }
+    c_envp.push(ptr::null());
+    c_envp.as_imm_buf(|buf, _| f(buf))
+}
+
+impl uv::NativeHandle<*uvll::uv_process_t> for Process {
+    fn from_native_handle(handle: *uvll::uv_process_t) -> Process {
+        Process(handle)
+    }
+    fn native_handle(&self) -> *uvll::uv_process_t {
+        match self { &Process(ptr) => ptr }
+    }
+}
diff --git a/src/libstd/rt/uv/timer.rs b/src/libstd/rt/uv/timer.rs
index eaa5e77a6da..7b09cf2eb0e 100644
--- a/src/libstd/rt/uv/timer.rs
+++ b/src/libstd/rt/uv/timer.rs
@@ -43,7 +43,7 @@ impl TimerWatcher {
             let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
             let data = watcher.get_watcher_data();
             let cb = data.timer_cb.get_ref();
-            let status = status_to_maybe_uv_error(watcher, status);
+            let status = status_to_maybe_uv_error(status);
             (*cb)(watcher, status);
         }
     }
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index e620ab274b1..80f1aef37ac 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -13,7 +13,7 @@ use cast::transmute;
 use cast;
 use cell::Cell;
 use clone::Clone;
-use libc::{c_int, c_uint, c_void};
+use libc::{c_int, c_uint, c_void, pid_t};
 use ops::Drop;
 use option::*;
 use ptr;
@@ -22,6 +22,7 @@ use result::*;
 use rt::io::IoError;
 use rt::io::net::ip::{SocketAddr, IpAddr};
 use rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur, SeekEnd};
+use rt::kill::BlockedTask;
 use rt::local::Local;
 use rt::rtio::*;
 use rt::sched::{Scheduler, SchedHandle};
@@ -148,7 +149,7 @@ fn socket_name<T, U: Watcher + NativeHandle<*T>>(sk: SocketNameKind,
     };
 
     if r != 0 {
-        let status = status_to_maybe_uv_error(handle, r);
+        let status = status_to_maybe_uv_error(r);
         return Err(uv_error_to_io_error(status.unwrap()));
     }
 
@@ -591,6 +592,63 @@ impl IoFactory for UvIoFactory {
         assert!(!result_cell.is_empty());
         return result_cell.take();
     }
+
+    fn pipe_init(&mut self, ipc: bool) -> Result<~RtioPipeObject, IoError> {
+        let home = get_handle_to_current_scheduler!();
+        Ok(~UvPipeStream { pipe: Pipe::new(self.uv_loop(), ipc), home: home })
+    }
+
+    fn spawn(&mut self,
+             config: &process::Config) -> Result<~RtioProcessObject, IoError> {
+        // Sadly, we must create the UvProcess before we actually call uv_spawn
+        // so that the exit_cb can close over it and notify it when the process
+        // has exited.
+        let mut ret = ~UvProcess {
+            process: Process::new(),
+            home: None,
+            exit_status: None,
+            term_signal: None,
+            exit_error: None,
+            descheduled: None,
+        };
+        let ret_ptr = unsafe {
+            *cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
+        };
+
+        // The purpose of this exit callback is to record the data about the
+        // exit and then wake up the task which may be waiting for the process
+        // to exit. This is all performed in the current io-loop, and the
+        // implementation of UvProcess ensures that reading these fields always
+        // occurs on the current io-loop.
+        let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
+            unsafe {
+                assert!((*ret_ptr).exit_status.is_none());
+                (*ret_ptr).exit_status = Some(exit_status);
+                (*ret_ptr).term_signal = Some(term_signal);
+                (*ret_ptr).exit_error = error;
+                match (*ret_ptr).descheduled.take() {
+                    Some(task) => {
+                        let scheduler: ~Scheduler = Local::take();
+                        scheduler.resume_blocked_task_immediately(task);
+                    }
+                    None => {}
+                }
+            }
+        };
+
+        match ret.process.spawn(self.uv_loop(), config, exit_cb) {
+            Ok(()) => {
+                // Only now do we actually get a handle to this scheduler.
+                ret.home = Some(get_handle_to_current_scheduler!());
+                Ok(ret)
+            }
+            Err(uverr) => {
+                // We still need to close the process handle we created, but
+                // that's taken care for us in the destructor of UvProcess
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
+    }
 }
 
 pub struct UvTcpListener {
@@ -679,7 +737,7 @@ impl RtioTcpListener for UvTcpListener {
                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 1 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher(), r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -692,7 +750,7 @@ impl RtioTcpListener for UvTcpListener {
                 uvll::tcp_simultaneous_accepts(self_.watcher().native_handle(), 0 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher(), r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -700,40 +758,15 @@ impl RtioTcpListener for UvTcpListener {
     }
 }
 
-pub struct UvTcpStream {
-    watcher: TcpWatcher,
-    home: SchedHandle,
-}
-
-impl HomingIO for UvTcpStream {
-    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+trait UvStream: HomingIO {
+    fn as_stream(&mut self) -> StreamWatcher;
 }
 
-impl Drop for UvTcpStream {
-    fn drop(&self) {
-        // XXX need mutable finalizer
-        let this = unsafe { transmute::<&UvTcpStream, &mut UvTcpStream>(self) };
-        do this.home_for_io_with_sched |self_, scheduler| {
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell::new(task);
-                do self_.watcher.as_stream().close {
-                    let scheduler: ~Scheduler = Local::take();
-                    scheduler.resume_blocked_task_immediately(task_cell.take());
-                }
-            }
-        }
-    }
-}
-
-impl RtioSocket for UvTcpStream {
-    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
-        do self.home_for_io |self_| {
-            socket_name(Tcp, self_.watcher)
-        }
-    }
-}
-
-impl RtioTcpStream for UvTcpStream {
+// FIXME(#3429) I would rather this be `impl<T: UvStream> RtioStream for T` but
+//              that has conflicts with other traits that also have methods
+//              called `read` and `write`
+macro_rules! rtiostream(($t:ident) => {
+impl RtioStream for $t {
     fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
         do self.home_for_io_with_sched |self_, scheduler| {
             let result_cell = Cell::new_empty();
@@ -747,7 +780,7 @@ impl RtioTcpStream for UvTcpStream {
                 let alloc: AllocCallback = |_| unsafe {
                     slice_to_uv_buf(*buf_ptr)
                 };
-                let mut watcher = self_.watcher.as_stream();
+                let mut watcher = self_.as_stream();
                 do watcher.read_start(alloc) |mut watcher, nread, _buf, status| {
 
                     // Stop reading so that no read callbacks are
@@ -783,7 +816,7 @@ impl RtioTcpStream for UvTcpStream {
             do scheduler.deschedule_running_task_and_then |_, task| {
                 let task_cell = Cell::new(task);
                 let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
-                let mut watcher = self_.watcher.as_stream();
+                let mut watcher = self_.as_stream();
                 do watcher.write(buf) |_watcher, status| {
                     let result = if status.is_none() {
                         Ok(())
@@ -802,7 +835,85 @@ impl RtioTcpStream for UvTcpStream {
             result_cell.take()
         }
     }
+}
+})
+
+rtiostream!(UvPipeStream)
+rtiostream!(UvTcpStream)
+
+pub struct UvPipeStream {
+    pipe: Pipe,
+    home: SchedHandle,
+}
+
+impl UvStream for UvPipeStream {
+    fn as_stream(&mut self) -> StreamWatcher { self.pipe.as_stream() }
+}
+
+impl HomingIO for UvPipeStream {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl Drop for UvPipeStream {
+    fn drop(&self) {
+        // FIXME(#4330): should not need a transmute
+        let this = unsafe { cast::transmute_mut(self) };
+        do this.home_for_io |self_| {
+            let scheduler: ~Scheduler = Local::take();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.pipe.close {
+                    let scheduler: ~Scheduler = Local::take();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
+            }
+        }
+    }
+}
+
+impl UvPipeStream {
+    pub fn uv_pipe(&self) -> Pipe { self.pipe }
+}
+
+pub struct UvTcpStream {
+    watcher: TcpWatcher,
+    home: SchedHandle,
+}
+
+impl HomingIO for UvTcpStream {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
+}
+
+impl Drop for UvTcpStream {
+    fn drop(&self) {
+        // FIXME(#4330): should not need a transmute
+        let this = unsafe { cast::transmute_mut(self) };
+        do this.home_for_io |self_| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task_cell = Cell::new(task);
+                do self_.watcher.as_stream().close {
+                    let scheduler = Local::take::<Scheduler>();
+                    scheduler.resume_blocked_task_immediately(task_cell.take());
+                }
+            }
+        }
+    }
+}
+
+impl UvStream for UvTcpStream {
+    fn as_stream(&mut self) -> StreamWatcher { self.watcher.as_stream() }
+}
+
+impl RtioSocket for UvTcpStream {
+    fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
+        do self.home_for_io |self_| {
+            socket_name(Tcp, self_.watcher)
+        }
+    }
+}
 
+impl RtioTcpStream for UvTcpStream {
     fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
         do self.home_for_io |self_| {
             socket_name(TcpPeer, self_.watcher)
@@ -813,7 +924,7 @@ impl RtioTcpStream for UvTcpStream {
         do self.home_for_io |self_| {
             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 0 as c_int) };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -824,7 +935,7 @@ impl RtioTcpStream for UvTcpStream {
         do self.home_for_io |self_| {
             let r = unsafe { uvll::tcp_nodelay(self_.watcher.native_handle(), 1 as c_int) };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -838,7 +949,7 @@ impl RtioTcpStream for UvTcpStream {
                                     delay_in_seconds as c_uint)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -851,7 +962,7 @@ impl RtioTcpStream for UvTcpStream {
                 uvll::tcp_keepalive(self_.watcher.native_handle(), 0 as c_int, 0 as c_uint)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -963,7 +1074,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 }
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -979,7 +1090,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 }
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -993,7 +1104,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 1 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1007,7 +1118,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_multicast_loop(self_.watcher.native_handle(), 0 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1021,7 +1132,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_multicast_ttl(self_.watcher.native_handle(), ttl as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1035,7 +1146,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1049,7 +1160,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 1 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1063,7 +1174,7 @@ impl RtioUdpSocket for UvUdpSocket {
                 uvll::udp_set_broadcast(self_.watcher.native_handle(), 0 as c_int)
             };
 
-            match status_to_maybe_uv_error(self_.watcher, r) {
+            match status_to_maybe_uv_error(r) {
                 Some(err) => Err(uv_error_to_io_error(err)),
                 None => Ok(())
             }
@@ -1250,6 +1361,89 @@ impl RtioFileStream for UvFileStream {
     }
 }
 
+pub struct UvProcess {
+    process: process::Process,
+
+    // Sadly, this structure must be created before we return it, so in that
+    // brief interim the `home` is None.
+    home: Option<SchedHandle>,
+
+    // All None until the process exits (exit_error may stay None)
+    priv exit_status: Option<int>,
+    priv term_signal: Option<int>,
+    priv exit_error: Option<UvError>,
+
+    // Used to store which task to wake up from the exit_cb
+    priv descheduled: Option<BlockedTask>,
+}
+
+impl HomingIO for UvProcess {
+    fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
+}
+
+impl Drop for UvProcess {
+    fn drop(&self) {
+        // FIXME(#4330): should not need a transmute
+        let this = unsafe { cast::transmute_mut(self) };
+
+        let close = |self_: &mut UvProcess| {
+            let scheduler = Local::take::<Scheduler>();
+            do scheduler.deschedule_running_task_and_then |_, task| {
+                let task = Cell::new(task);
+                do self_.process.close {
+                    let scheduler: ~Scheduler = Local::take();
+                    scheduler.resume_blocked_task_immediately(task.take());
+                }
+            }
+        };
+
+        // If home is none, then this process never actually successfully
+        // spawned, so there's no need to switch event loops
+        if this.home.is_none() {
+            close(this)
+        } else {
+            this.home_for_io(close)
+        }
+    }
+}
+
+impl RtioProcess for UvProcess {
+    fn id(&self) -> pid_t {
+        self.process.pid()
+    }
+
+    fn kill(&mut self, signal: int) -> Result<(), IoError> {
+        do self.home_for_io |self_| {
+            match self_.process.kill(signal) {
+                Ok(()) => Ok(()),
+                Err(uverr) => Err(uv_error_to_io_error(uverr))
+            }
+        }
+    }
+
+    fn wait(&mut self) -> int {
+        // Make sure (on the home scheduler) that we have an exit status listed
+        do self.home_for_io |self_| {
+            match self_.exit_status {
+                Some(*) => {}
+                None => {
+                    // If there's no exit code previously listed, then the
+                    // process's exit callback has yet to be invoked. We just
+                    // need to deschedule ourselves and wait to be reawoken.
+                    let scheduler: ~Scheduler = Local::take();
+                    do scheduler.deschedule_running_task_and_then |_, task| {
+                        assert!(self_.descheduled.is_none());
+                        self_.descheduled = Some(task);
+                    }
+                    assert!(self_.exit_status.is_some());
+                }
+            }
+        }
+
+        self.exit_status.unwrap()
+    }
+}
+
 #[test]
 fn test_simple_io_no_connect() {
     do run_in_newsched_task {
diff --git a/src/libstd/rt/uv/uvll.rs b/src/libstd/rt/uv/uvll.rs
index 00c2974cb54..24e070ca239 100644
--- a/src/libstd/rt/uv/uvll.rs
+++ b/src/libstd/rt/uv/uvll.rs
@@ -67,11 +67,44 @@ pub mod errors {
     pub static EPIPE: c_int = -libc::EPIPE;
 }
 
+pub static PROCESS_SETUID: c_int = 1 << 0;
+pub static PROCESS_SETGID: c_int = 1 << 1;
+pub static PROCESS_WINDOWS_VERBATIM_ARGUMENTS: c_int = 1 << 2;
+pub static PROCESS_DETACHED: c_int = 1 << 3;
+pub static PROCESS_WINDOWS_HIDE: c_int = 1 << 4;
+
+pub static STDIO_IGNORE: c_int = 0x00;
+pub static STDIO_CREATE_PIPE: c_int = 0x01;
+pub static STDIO_INHERIT_FD: c_int = 0x02;
+pub static STDIO_INHERIT_STREAM: c_int = 0x04;
+pub static STDIO_READABLE_PIPE: c_int = 0x10;
+pub static STDIO_WRITABLE_PIPE: c_int = 0x20;
+
 pub struct uv_buf_t {
     base: *u8,
     len: libc::size_t,
 }
 
+pub struct uv_process_options_t {
+    exit_cb: uv_exit_cb,
+    file: *libc::c_char,
+    args: **libc::c_char,
+    env: **libc::c_char,
+    cwd: *libc::c_char,
+    flags: libc::c_uint,
+    stdio_count: libc::c_int,
+    stdio: *uv_stdio_container_t,
+    uid: uv_uid_t,
+    gid: uv_gid_t,
+}
+
+// These fields are private because they must be interfaced with through the
+// functions below.
+pub struct uv_stdio_container_t {
+    priv flags: libc::c_int,
+    priv stream: *uv_stream_t,
+}
+
 pub type uv_handle_t = c_void;
 pub type uv_loop_t = c_void;
 pub type uv_idle_t = c_void;
@@ -85,6 +118,8 @@ pub type uv_timer_t = c_void;
 pub type uv_stream_t = c_void;
 pub type uv_fs_t = c_void;
 pub type uv_udp_send_t = c_void;
+pub type uv_process_t = c_void;
+pub type uv_pipe_t = c_void;
 
 #[cfg(stage0)]
 pub type uv_idle_cb = *u8;
@@ -110,6 +145,8 @@ pub type uv_connection_cb = *u8;
 pub type uv_timer_cb = *u8;
 #[cfg(stage0)]
 pub type uv_write_cb = *u8;
+#[cfg(stage0)]
+pub type uv_exit_cb = *u8;
 
 #[cfg(not(stage0))]
 pub type uv_idle_cb = extern "C" fn(handle: *uv_idle_t,
@@ -150,12 +187,21 @@ pub type uv_timer_cb = extern "C" fn(handle: *uv_timer_t,
 #[cfg(not(stage0))]
 pub type uv_write_cb = extern "C" fn(handle: *uv_write_t,
                                      status: c_int);
+#[cfg(not(stage0))]
+pub type uv_exit_cb = extern "C" fn(handle: *uv_process_t,
+                                    exit_status: c_int,
+                                    term_signal: c_int);
 
 pub type sockaddr = c_void;
 pub type sockaddr_in = c_void;
 pub type sockaddr_in6 = c_void;
 pub type sockaddr_storage = c_void;
 
+#[cfg(unix)] pub type uv_uid_t = libc::types::os::arch::posix88::uid_t;
+#[cfg(unix)] pub type uv_gid_t = libc::types::os::arch::posix88::gid_t;
+#[cfg(windows)] pub type uv_uid_t = libc::c_uchar;
+#[cfg(windows)] pub type uv_gid_t = libc::c_uchar;
+
 #[deriving(Eq)]
 pub enum uv_handle_type {
     UV_UNKNOWN_HANDLE,
@@ -659,6 +705,45 @@ pub unsafe fn fs_req_cleanup(req: *uv_fs_t) {
     rust_uv_fs_req_cleanup(req);
 }
 
+pub unsafe fn spawn(loop_ptr: *c_void, result: *uv_process_t,
+                    options: uv_process_options_t) -> c_int {
+    #[fixed_stack_segment]; #[inline(never)];
+    return rust_uv_spawn(loop_ptr, result, options);
+}
+
+pub unsafe fn process_kill(p: *uv_process_t, signum: c_int) -> c_int {
+    #[fixed_stack_segment]; #[inline(never)];
+    return rust_uv_process_kill(p, signum);
+}
+
+pub unsafe fn process_pid(p: *uv_process_t) -> c_int {
+    #[fixed_stack_segment]; #[inline(never)];
+    return rust_uv_process_pid(p);
+}
+
+pub unsafe fn set_stdio_container_flags(c: *uv_stdio_container_t,
+                                        flags: libc::c_int) {
+    #[fixed_stack_segment]; #[inline(never)];
+    rust_set_stdio_container_flags(c, flags);
+}
+
+pub unsafe fn set_stdio_container_fd(c: *uv_stdio_container_t,
+                                     fd: libc::c_int) {
+    #[fixed_stack_segment]; #[inline(never)];
+    rust_set_stdio_container_fd(c, fd);
+}
+
+pub unsafe fn set_stdio_container_stream(c: *uv_stdio_container_t,
+                                         stream: *uv_stream_t) {
+    #[fixed_stack_segment]; #[inline(never)];
+    rust_set_stdio_container_stream(c, stream);
+}
+
+pub unsafe fn pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int {
+    #[fixed_stack_segment]; #[inline(never)];
+    rust_uv_pipe_init(loop_ptr, p, ipc)
+}
+
 // data access helpers
 pub unsafe fn get_result_from_fs_req(req: *uv_fs_t) -> c_int {
     #[fixed_stack_segment]; #[inline(never)];
@@ -844,4 +929,13 @@ extern {
     fn rust_uv_set_data_for_req(req: *c_void, data: *c_void);
     fn rust_uv_get_base_from_buf(buf: uv_buf_t) -> *u8;
     fn rust_uv_get_len_from_buf(buf: uv_buf_t) -> size_t;
+    fn rust_uv_spawn(loop_ptr: *c_void, outptr: *uv_process_t,
+                     options: uv_process_options_t) -> c_int;
+    fn rust_uv_process_kill(p: *uv_process_t, signum: c_int) -> c_int;
+    fn rust_uv_process_pid(p: *uv_process_t) -> c_int;
+    fn rust_set_stdio_container_flags(c: *uv_stdio_container_t, flags: c_int);
+    fn rust_set_stdio_container_fd(c: *uv_stdio_container_t, fd: c_int);
+    fn rust_set_stdio_container_stream(c: *uv_stdio_container_t,
+                                       stream: *uv_stream_t);
+    fn rust_uv_pipe_init(loop_ptr: *c_void, p: *uv_pipe_t, ipc: c_int) -> c_int;
 }