about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libstd/uvtmp.rs156
-rw-r--r--src/rt/rust_uvtmp.cpp40
2 files changed, 154 insertions, 42 deletions
diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs
index 18d4904c224..8a61df37982 100644
--- a/src/libstd/uvtmp.rs
+++ b/src/libstd/uvtmp.rs
@@ -2,20 +2,25 @@
 
 // UV2
 enum uv_operation {
-    op_hw()
+    op_async_init([u8])
 }
 
+type uv_async = {
+    id: [u8],
+    loop: uv_loop
+};
+
 enum uv_msg {
     // requests from library users
     msg_run(comm::chan<bool>),
     msg_run_in_bg(),
     msg_loop_delete(),
-    msg_async_init([u8], fn~()),
+    msg_async_init(fn~(uv_async), fn~(uv_async)),
     msg_async_send([u8]),
-    msg_hw(),
 
     // dispatches from libuv
-    uv_hw()
+    uv_async_init([u8], *ctypes::void),
+    uv_async_send([u8])
 }
 
 type uv_loop_data = {
@@ -25,10 +30,6 @@ type uv_loop_data = {
 
 type uv_loop = comm::chan<uv_msg>;
 
-enum uv_handle {
-    handle([u8], *ctypes::void)
-}
-
 #[nolink]
 native mod rustrt {
     fn rust_uvtmp_create_thread() -> thread;
@@ -66,10 +67,14 @@ native mod rustrt {
     fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void;
     fn rust_uvtmp_uv_run(loop_handle: *ctypes::void);
     fn rust_uvtmp_uv_async_send(handle: *ctypes::void);
+    fn rust_uvtmp_uv_async_init(
+        loop_handle: *ctypes::void,
+        cb: *u8,
+        id: *u8) -> *ctypes::void;
 }
 
 mod uv {
-    export loop_new, run, run_in_bg, hw;
+    export loop_new, run, run_in_bg, async_init, async_send;
 
     // public functions
     fn loop_new() -> uv_loop unsafe {
@@ -78,7 +83,9 @@ mod uv {
         let ret_recv_chan: comm::chan<uv_loop> =
             comm::chan(ret_recv_port);
 
-        task::spawn_sched(3u) {||
+        let num_threads = 4u; // would be cool to tie this to
+                              // the number of logical procs
+        task::spawn_sched(num_threads) {||
             // our beloved uv_loop_t ptr
             let loop_handle = rustrt::
                 rust_uvtmp_uv_loop_new();
@@ -115,12 +122,17 @@ mod uv {
                                           // to libuv, this will be
                                           // in the process_operation
                                           // crust fn
-            let async_handle = rustrt::rust_uvtmp_uv_bind_op_cb(
+            let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb(
                 loop_handle,
                 process_operation);
 
             // all state goes here
-            let handles: map::map<[u8], uv_handle> =
+            let handles: map::map<[u8], *ctypes::void> =
+                map::new_bytes_hash();
+            let async_cbs: map::map<[u8], fn~(uv_async)> =
+                map::new_bytes_hash();
+            let async_init_after_cbs: map::map<[u8],
+                                               fn~(uv_async)> =
                 map::new_bytes_hash();
 
             // the main loop that this task blocks on.
@@ -143,36 +155,51 @@ mod uv {
                         comm::send(end_chan, true);
                     };
                   }
+                  
                   msg_run_in_bg {
                     task::spawn_sched(1u) {||
                         // this call blocks
                         rustrt::rust_uvtmp_uv_run(loop_handle);
                     };
                   }
-                  msg_hw() {
-                    comm::send(operation_chan, op_hw);
-                    io::println("CALLING ASYNC_SEND FOR HW");
-                    rustrt::rust_uvtmp_uv_async_send(async_handle);
-                  }
-                  uv_hw() {
-                    io::println("HELLO WORLD!!!");
-                  }
-
-                  ////// STUBS ///////
-                  msg_loop_delete {
-                    // delete the event loop's c ptr
-                    // this will of course stop any
-                    // further processing
-                  }
-                  msg_async_init(id, callback) {
+                  
+                  msg_async_init(callback, after_cb) {
                     // create a new async handle
                     // with the id as the handle's
                     // data and save the callback for
                     // invocation on msg_async_send
+                    let id = gen_handle_id();
+                    async_cbs.insert(id, callback);
+                    async_init_after_cbs.insert(id, after_cb);
+                    let op = op_async_init(id);
+                    comm::send(operation_chan, op);
+                    rustrt::rust_uvtmp_uv_async_send(op_handle);
+                    io::println("MSG_ASYNC_INIT");
                   }
+                  uv_async_init(id, async_handle) {
+                    // libuv created a handle, which is
+                    // passed back to us. save it and
+                    // then invoke the supplied callback
+                    // for after completion
+                    handles.insert(id, async_handle);
+                    let after_cb = async_init_after_cbs.get(id);
+                    async_init_after_cbs.remove(id);
+                    task::spawn {||
+                        let async: uv_async = {
+                            id: id,
+                            loop: rust_loop_chan
+                        };
+                        after_cb(async);
+                    };
+                  }
+
                   msg_async_send(id) {
-                    // get the callback matching the
-                    // supplied id and invoke it
+                    let async_handle = handles.get(id);
+                    rustrt::rust_uvtmp_uv_async_send(async_handle);
+                  }
+                  uv_async_send(id) {
+                    let async_cb = async_cbs.get(id);
+                    async_cb({id: id, loop: rust_loop_chan});
                   }
 
                   _ { fail "unknown form of uv_msg received"; }
@@ -193,37 +220,88 @@ mod uv {
         comm::send(loop, msg_run_in_bg);
     }
 
-    fn hw(loop: uv_loop) {
-        comm::send(loop, msg_hw);
+    fn async_init (
+        loop: uv_loop,
+        async_cb: fn~(uv_async),
+        after_cb: fn~(uv_async)) {
+        let msg = msg_async_init(async_cb, after_cb);
+        comm::send(loop, msg);
+    }
+
+    fn async_send(async: uv_async) {
+        comm::send(async.loop, msg_async_send(async.id));
     }
 
     // internal functions
+    fn gen_handle_id() -> [u8] {
+        ret rand::mk_rng().gen_bytes(16u);
+    }
+    fn get_handle_id_from(buf: *u8) -> [u8] unsafe {
+        ret vec::unsafe::from_buf(buf, 16u); 
+    }
+
+    fn get_loop_chan_from(data: *uv_loop_data)
+            -> comm::chan<uv_msg> unsafe {
+        ret (*data).rust_loop_chan;
+    }
 
     // crust
-    crust fn process_operation(data: *uv_loop_data) unsafe {
+    crust fn process_operation(
+            loop: *ctypes::void,
+            data: *uv_loop_data) unsafe {
         io::println("IN PROCESS_OPERATION");
         let op_port = (*data).operation_port;
-        let loop_chan = (*data).rust_loop_chan;
+        let loop_chan = get_loop_chan_from(data);
         let op_pending = comm::peek(op_port);
         while(op_pending) {
             io::println("OPERATION PENDING!");
             alt comm::recv(op_port) {
-              op_hw() {
-                io::println("GOT OP_HW IN CRUST");
-                comm::send(loop_chan, uv_hw);
+              op_async_init(id) {
+                io::println("OP_ASYNC_INIT");
+                let id_ptr = vec::unsafe::to_ptr(id);
+                let async_handle = rustrt::rust_uvtmp_uv_async_init(
+                    loop,
+                    process_async_send,
+                    id_ptr);
+                comm::send(loop_chan, uv_async_init(
+                    id,
+                    async_handle));
               }
+              
               _ { fail "unknown form of uv_operation received"; }
             }
             op_pending = comm::peek(op_port);
         }
         io::println("NO MORE OPERATIONS PENDING!");
     }
+
+    crust fn process_async_send(id_buf: *u8, data: *uv_loop_data)
+            unsafe {
+        let handle_id = get_handle_id_from(id_buf);
+        let loop_chan = get_loop_chan_from(data);
+        comm::send(loop_chan, uv_async_send(handle_id));
+    }
+
+    
+}
+
+#[test]
+fn test_uvtmp_uv_new_loop_no_handles() {
+    let test_loop = uv::loop_new();
+    uv::run(test_loop); // this should return immediately
+                        // since there aren't any handles..
 }
 
 #[test]
-fn uvtmp_uv_test_hello_world() {
+fn test_uvtmp_uv_simple_async() {
     let test_loop = uv::loop_new();
-    uv::hw(test_loop);
+    let cb: fn~(uv_async) = fn~(h: uv_async) {
+        io::println("HELLO FROM ASYNC CALLBACK!");
+    };
+    uv::async_init(test_loop, cb) {|new_async|
+        io::println("NEW_ASYNC CREATED!");
+        uv::async_send(new_async);
+    };
     uv::run(test_loop);
 }
 
diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp
index 4a8df1fa886..18a30fb6404 100644
--- a/src/rt/rust_uvtmp.cpp
+++ b/src/rt/rust_uvtmp.cpp
@@ -57,6 +57,9 @@ struct timer_start_data {
 
 // UVTMP REWORK
 
+typedef void (*async_op_cb)(uv_loop_t* loop, void* data);
+typedef void (*rust_async_cb)(uint8_t* id_buf, void* loop_data);
+
 static void*
 current_kernel_malloc(size_t size, const char* tag) {
   return rust_task_thread::get_task()->malloc(size, tag);
@@ -68,6 +71,11 @@ current_kernel_free(void* ptr) {
   rust_task_thread::get_task()->free(ptr);
 }
 */
+#define RUST_UV_HANDLE_LEN 16
+struct async_data {
+	uint8_t id_buf[RUST_UV_HANDLE_LEN];
+	rust_async_cb cb;
+};
 
 extern "C" void*
 rust_uvtmp_uv_loop_new() {
@@ -79,11 +87,11 @@ rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) {
     loop->data = data;
 }
 
-typedef void (*async_op_cb)(void* data);
-void native_async_op_cb(uv_async_t* handle, int status) {
+static void
+native_async_op_cb(uv_async_t* handle, int status) {
     async_op_cb cb = (async_op_cb)handle->data;
 	void* loop_data = handle->loop->data;
-	cb(loop_data);
+	cb(handle->loop, loop_data);
 }
 
 extern "C" void*
@@ -92,6 +100,8 @@ rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, async_op_cb cb) {
 		sizeof(uv_async_t),
 		"uv_async_t");
 	uv_async_init(loop, async, native_async_op_cb);
+	// decrement the ref count, so that our async bind
+	// does count towards keeping the loop alive
 	async->data = (void*)cb;
 	return async;
 }
@@ -105,6 +115,30 @@ rust_uvtmp_uv_async_send(uv_async_t* handle) {
     uv_async_send(handle);
 }
 
+static void
+native_async_cb(uv_async_t* handle, int status) {
+	async_data* handle_data = (async_data*)handle->data;
+	void* loop_data = handle->loop->data;
+	handle_data->cb(handle_data->id_buf, loop_data);
+}
+
+extern "C" void*
+rust_uvtmp_uv_async_init(uv_loop_t* loop, rust_async_cb cb,
+						 uint8_t* buf) {
+    uv_async_t* async = (uv_async_t*)current_kernel_malloc(
+		sizeof(uv_async_t),
+		"uv_async_t");
+	uv_async_init(loop, async, native_async_cb);
+	async_data* data = (async_data*)current_kernel_malloc(
+		sizeof(async_data),
+		"async_data");
+	memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN);
+	data->cb = cb;
+	async->data = data;
+
+	return async;
+}
+
 // UVTMP REWORK
 
 // FIXME: Copied from rust_builtins.cpp. Could bitrot easily