diff options
| author | Brian Anderson <banderson@mozilla.com> | 2012-05-24 23:42:12 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2012-05-25 11:03:29 -0700 |
| commit | 81b8e20f31dbc5b2e985ba4109bf86fe6e06f2e2 (patch) | |
| tree | 1afaadbaffc5a56a596db88cbd3464eaa38e7254 /src/libstd | |
| parent | 59262dfc6280b81f56129aaa6deaf2f74c63efa6 (diff) | |
| download | rust-81b8e20f31dbc5b2e985ba4109bf86fe6e06f2e2.tar.gz rust-81b8e20f31dbc5b2e985ba4109bf86fe6e06f2e2.zip | |
std: Rename uv::hl to uv::iotask. Additional cleanup
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/net_tcp.rs | 76 | ||||
| -rw-r--r-- | src/libstd/std.rc | 4 | ||||
| -rw-r--r-- | src/libstd/timer.rs | 18 | ||||
| -rw-r--r-- | src/libstd/uv.rs | 4 | ||||
| -rw-r--r-- | src/libstd/uv_global_loop.rs | 21 | ||||
| -rw-r--r-- | src/libstd/uv_iotask.rs (renamed from src/libstd/uv_hl.rs) | 191 |
6 files changed, 156 insertions, 158 deletions
diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 0f29b0c391e..b59217fcea3 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -4,6 +4,8 @@ High-level interface to libuv's TCP functionality // FIXME: Fewer import *'s import ip = net_ip; +import uv::iotask; +import uv::iotask::iotask; import comm::*; import result::*; import str::*; @@ -44,7 +46,7 @@ resource tcp_socket(socket_data: @tcp_socket_data) }; let close_data_ptr = ptr::addr_of(close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| + iotask::interact((*socket_data).iotask) {|loop_ptr| log(debug, #fmt("interact dtor for tcp_socket stream %? loop %?", stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -62,8 +64,8 @@ resource tcp_conn_port(conn_data: @tcp_conn_port_data) unsafe { let conn_data_ptr = ptr::addr_of(*conn_data); let server_stream_ptr = ptr::addr_of((*conn_data_ptr).server_stream); let stream_closed_po = (*conn_data).stream_closed_po; - let hl_loop = (*conn_data_ptr).hl_loop; - uv::hl::interact(hl_loop) {|loop_ptr| + let iotask = (*conn_data_ptr).iotask; + iotask::interact(iotask) {|loop_ptr| log(debug, #fmt("dtor for tcp_conn_port loop: %?", loop_ptr)); uv::ll::close(server_stream_ptr, tcp_nl_close_cb); @@ -86,7 +88,7 @@ Initiate a client connection over TCP/IP * `ip` - The IP address (versions 4 or 6) of the remote host * `port` - the unsigned integer of the desired remote host port -* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on +* `iotask` - a `uv::iotask` that the tcp request will run on # Returns @@ -95,7 +97,7 @@ can be used to send and receive data to/from the remote host. In the event of failure, a `tcp_err_data` will be returned "] fn connect(input_ip: ip::ip_addr, port: uint, - hl_loop: uv::hl::high_level_loop) + iotask: iotask) -> result::result<tcp_socket, tcp_err_data> unsafe { let result_po = comm::port::<conn_attempt>(); let closed_signal_po = comm::port::<()>(); @@ -113,7 +115,7 @@ fn connect(input_ip: ip::ip_addr, port: uint, stream_handle_ptr: stream_handle_ptr, connect_req: uv::ll::connect_t(), write_req: uv::ll::write_t(), - hl_loop: hl_loop + iotask: iotask }; let socket_data_ptr = ptr::addr_of(*socket_data); log(debug, #fmt("tcp_connect result_ch %?", conn_data.result_ch)); @@ -121,7 +123,7 @@ fn connect(input_ip: ip::ip_addr, port: uint, // we can send into the interact cb to be handled in libuv.. log(debug, #fmt("stream_handle_ptr outside interact %?", stream_handle_ptr)); - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| log(debug, "in interact cb for tcp client connect.."); log(debug, #fmt("stream_handle_ptr in interact %?", stream_handle_ptr)); @@ -354,7 +356,7 @@ to listen for, and accept, new connections, or a `tcp_err_data` if failure to create the tcp listener occurs "] fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint, - hl_loop: uv::hl::high_level_loop) + iotask: iotask) -> result::result<tcp_conn_port, tcp_err_data> unsafe { let stream_closed_po = comm::port::<()>(); let stream_closed_ch = comm::chan(stream_closed_po); @@ -367,7 +369,7 @@ fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint, server_stream: uv::ll::tcp_t(), stream_closed_po: stream_closed_po, stream_closed_ch: stream_closed_ch, - hl_loop: hl_loop, + iotask: iotask, new_conn_po: new_conn_po, new_conn_ch: new_conn_ch }; @@ -377,7 +379,7 @@ fn new_listener(host_ip: ip::ip_addr, port: uint, backlog: uint, let setup_po = comm::port::<option<tcp_err_data>>(); let setup_ch = comm::chan(setup_po); - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip, port); alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) { @@ -445,11 +447,11 @@ variant fn conn_recv(server_port: tcp_conn_port) -> result::result<tcp_socket, tcp_err_data> { let new_conn_po = (**server_port).new_conn_po; - let hl_loop = (**server_port).hl_loop; + let iotask = (**server_port).iotask; let new_conn_result = comm::recv(new_conn_po); alt new_conn_result { ok(client_stream_ptr) { - conn_port_new_tcp_socket(client_stream_ptr, hl_loop) + conn_port_new_tcp_socket(client_stream_ptr, iotask) } err(err_data) { result::err(err_data) @@ -476,12 +478,12 @@ once a new connection is recv'd. Its parameter: fn conn_recv_spawn(server_port: tcp_conn_port, cb: fn~(result::result<tcp_socket, tcp_err_data>)) { let new_conn_po = (**server_port).new_conn_po; - let hl_loop = (**server_port).hl_loop; + let iotask = (**server_port).iotask; let new_conn_result = comm::recv(new_conn_po); task::spawn {|| let sock_create_result = alt new_conn_result { ok(client_stream_ptr) { - conn_port_new_tcp_socket(client_stream_ptr, hl_loop) + conn_port_new_tcp_socket(client_stream_ptr, iotask) } err(err_data) { result::err(err_data) @@ -582,7 +584,7 @@ fn accept(new_conn: tcp_new_connection) let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *tcp_listen_fc_data; let reader_po = comm::port::<result::result<[u8], tcp_err_data>>(); - let hl_loop = (*server_data_ptr).hl_loop; + let iotask = (*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let client_socket_data = @{ @@ -591,7 +593,7 @@ fn accept(new_conn: tcp_new_connection) stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), - hl_loop: hl_loop + iotask : iotask }; let client_socket_data_ptr = ptr::addr_of(*client_socket_data); let client_stream_handle_ptr = @@ -677,7 +679,7 @@ successful/normal shutdown, and a `tcp_err_data` record in the event of listen exiting because of an error "] fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, - hl_loop: uv::hl::high_level_loop, + iotask: iotask, on_establish_cb: fn~(comm::chan<option<tcp_err_data>>), new_connect_cb: fn~(tcp_new_connection, comm::chan<option<tcp_err_data>>)) @@ -692,14 +694,14 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, stream_closed_ch: comm::chan(stream_closed_po), kill_ch: kill_ch, new_connect_cb: new_connect_cb, - hl_loop: hl_loop, + iotask: iotask, mut active: true }; let server_data_ptr = ptr::addr_of(server_data); let setup_po = comm::port::<option<tcp_err_data>>(); let setup_ch = comm::chan(setup_po); - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| let tcp_addr = ipv4_ip_addr_to_sockaddr_in(host_ip, port); alt uv::ll::tcp_init(loop_ptr, server_stream_ptr) { @@ -745,7 +747,7 @@ fn listen_for_conn(host_ip: ip::ip_addr, port: uint, backlog: uint, none { on_establish_cb(kill_ch); let kill_result = comm::recv(kill_po); - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| log(debug, #fmt("tcp::listen post-kill recv hl interact %?", loop_ptr)); (*server_data_ptr).active = false; @@ -811,7 +813,7 @@ impl sock_methods for tcp_socket { fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) -> result::result<[u8],tcp_err_data> unsafe { log(debug, "starting tcp::read"); - let hl_loop = (*socket_data).hl_loop; + let iotask = (*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_failure(rs_result) { let err_data = result::get_err(rs_result); @@ -821,7 +823,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint) log(debug, "tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( - hl_loop, timeout_msecs, result::get(rs_result)) + iotask, timeout_msecs, result::get(rs_result)) } else { some(comm::recv(result::get(rs_result))) }; @@ -851,7 +853,7 @@ fn read_stop_common_impl(socket_data: *tcp_socket_data) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = comm::port::<option<tcp_err_data>>(); let stop_ch = comm::chan(stop_po); - uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| + iotask::interact((*socket_data).iotask) {|loop_ptr| log(debug, "in interact cb for tcp::read_stop"); alt uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 { @@ -883,7 +885,7 @@ fn read_start_common_impl(socket_data: *tcp_socket_data) let start_po = comm::port::<option<uv::ll::uv_err_data>>(); let start_ch = comm::chan(start_po); log(debug, "in tcp::read_start before interact loop"); - uv::hl::interact((*socket_data).hl_loop) {|loop_ptr| + iotask::interact((*socket_data).iotask) {|loop_ptr| log(debug, #fmt("in tcp::read_start interact cb %?", loop_ptr)); alt uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, @@ -925,7 +927,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, result_ch: comm::chan(result_po) }; let write_data_ptr = ptr::addr_of(write_data); - uv::hl::interact((*socket_data_ptr).hl_loop) {|loop_ptr| + iotask::interact((*socket_data_ptr).iotask) {|loop_ptr| log(debug, #fmt("in interact cb for tcp::write %?", loop_ptr)); alt uv::ll::write(write_req_ptr, stream_handle_ptr, @@ -956,7 +958,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data, // various recv_* can use a tcp_conn_port can re-use this.. fn conn_port_new_tcp_socket( stream_handle_ptr: *uv::ll::uv_tcp_t, - hl_loop: uv::hl::high_level_loop) + iotask: iotask) -> result::result<tcp_socket,tcp_err_data> unsafe { // tcp_nl_on_connection_cb let reader_po = comm::port::<result::result<[u8], tcp_err_data>>(); @@ -966,11 +968,11 @@ fn conn_port_new_tcp_socket( stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), - hl_loop : hl_loop + iotask : iotask }; let client_socket_data_ptr = ptr::addr_of(*client_socket_data); comm::listen {|cont_ch| - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| log(debug, #fmt("in interact cb 4 conn_port_new_tcp.. loop %?", loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -990,7 +992,7 @@ type tcp_conn_port_data = { server_stream: uv::ll::uv_tcp_t, stream_closed_po: comm::port<()>, stream_closed_ch: comm::chan<()>, - hl_loop: uv::hl::high_level_loop, + iotask: iotask, new_conn_po: comm::port<result::result<*uv::ll::uv_tcp_t, tcp_err_data>>, new_conn_ch: comm::chan<result::result<*uv::ll::uv_tcp_t, @@ -1003,7 +1005,7 @@ type tcp_listen_fc_data = { kill_ch: comm::chan<option<tcp_err_data>>, new_connect_cb: fn~(tcp_new_connection, comm::chan<option<tcp_err_data>>), - hl_loop: uv::hl::high_level_loop, + iotask: iotask, mut active: bool }; @@ -1264,7 +1266,7 @@ type tcp_socket_data = { stream_handle_ptr: *uv::ll::uv_tcp_t, connect_req: uv::ll::uv_connect_t, write_req: uv::ll::uv_write_t, - hl_loop: uv::hl::high_level_loop + iotask: iotask }; // convert rust ip_addr to libuv's native representation @@ -1405,13 +1407,13 @@ mod test { fn run_tcp_test_server(server_ip: str, server_port: uint, resp: str, server_ch: comm::chan<str>, cont_ch: comm::chan<()>, - hl_loop: uv::hl::high_level_loop) -> str { + iotask: iotask) -> str { task::spawn_sched(task::manual_threads(1u)) {|| let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen_for_conn(server_ip_addr, server_port, 128u, - hl_loop, + iotask, // on_establish_cb -- called when listener is set up {|kill_ch| log(debug, #fmt("establish_cb %?", @@ -1484,12 +1486,12 @@ mod test { server_port: uint, resp: str, server_ch: comm::chan<str>, cont_ch: comm::chan<()>, - hl_loop: uv::hl::high_level_loop) -> str { + iotask: iotask) -> str { task::spawn_sched(task::manual_threads(1u)) {|| let server_ip_addr = ip::v4::parse_addr(server_ip); let new_listener_result = - new_listener(server_ip_addr, server_port, 128u, hl_loop); + new_listener(server_ip_addr, server_port, 128u, iotask); if result::is_failure(new_listener_result) { let err_data = result::get_err(new_listener_result); log(debug, #fmt("SERVER: exited abnormally name %s msg %s", @@ -1533,12 +1535,12 @@ mod test { fn run_tcp_test_client(server_ip: str, server_port: uint, resp: str, client_ch: comm::chan<str>, - hl_loop: uv::hl::high_level_loop) -> str { + iotask: iotask) -> str { let server_ip_addr = ip::v4::parse_addr(server_ip); log(debug, "CLIENT: starting.."); - let connect_result = connect(server_ip_addr, server_port, hl_loop); + let connect_result = connect(server_ip_addr, server_port, iotask); if result::is_failure(connect_result) { log(debug, "CLIENT: failed to connect"); let err_data = result::get_err(connect_result); diff --git a/src/libstd/std.rc b/src/libstd/std.rc index bcf0574a7f3..c8355548598 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -14,7 +14,7 @@ use core(vers = "0.2"); import core::*; export net, net_tcp; -export uv, uv_ll, uv_hl, uv_global_loop; +export uv, uv_ll, uv_iotask, uv_global_loop; export c_vec, util, timer; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap; export rope, arena, arc; @@ -30,7 +30,7 @@ mod net_tcp; // libuv modules mod uv; mod uv_ll; -mod uv_hl; +mod uv_iotask; mod uv_global_loop; diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index 6c5c4b350e7..9fdf6c8550a 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -3,6 +3,8 @@ Utilities that leverage libuv's `uv_timer_*` API "]; import uv = uv; +import uv::iotask; +import iotask::iotask; export delayed_send, sleep, recv_timeout; #[doc = " @@ -21,7 +23,7 @@ for *at least* that period of time. * ch - a channel of type T to send a `val` on * val - a value of type T to send over the provided `ch` "] -fn delayed_send<T: copy send>(hl_loop: uv::hl::high_level_loop, +fn delayed_send<T: copy send>(iotask: iotask, msecs: uint, ch: comm::chan<T>, val: T) { // FIME: Looks like we don't need to spawn here task::spawn() {|| @@ -31,7 +33,7 @@ fn delayed_send<T: copy send>(hl_loop: uv::hl::high_level_loop, let timer_done_ch_ptr = ptr::addr_of(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(timer); - uv::hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| let init_result = uv::ll::timer_init(loop_ptr, timer_ptr); if (init_result == 0i32) { let start_result = uv::ll::timer_start( @@ -69,13 +71,13 @@ for *at least* that period of time. # Arguments -* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on +* `iotask` - a `uv::iotask` that the tcp request will run on * msecs - an amount of time, in milliseconds, for the current task to block "] -fn sleep(hl_loop: uv::hl::high_level_loop, msecs: uint) { +fn sleep(iotask: iotask, msecs: uint) { let exit_po = comm::port::<()>(); let exit_ch = comm::chan(exit_po); - delayed_send(hl_loop, msecs, exit_ch, ()); + delayed_send(iotask, msecs, exit_ch, ()); comm::recv(exit_po); } @@ -88,7 +90,7 @@ timeout. Depending on whether the provided port receives in that time period, # Arguments -* `hl_loop` - a `uv::hl::high_level_loop` that the tcp request will run on +* `iotask' - `uv::iotask` that the tcp request will run on * msecs - an mount of time, in milliseconds, to wait to receive * wait_port - a `comm::port<T>` to receive on @@ -98,12 +100,12 @@ An `option<T>` representing the outcome of the call. If the call `recv`'d on the provided port in the allotted timeout period, then the result will be a `some(T)`. If not, then `none` will be returned. "] -fn recv_timeout<T: copy send>(hl_loop: uv::hl::high_level_loop, +fn recv_timeout<T: copy send>(iotask: iotask, msecs: uint, wait_po: comm::port<T>) -> option<T> { let timeout_po = comm::port::<()>(); let timeout_ch = comm::chan(timeout_po); - delayed_send(hl_loop, msecs, timeout_ch, ()); + delayed_send(iotask, msecs, timeout_ch, ()); // FIXME: This could be written clearer either::either( {|left_val| diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 29eb90a1a68..c2e4c55d61e 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -26,8 +26,8 @@ facilities. import ll = uv_ll; export ll; -import hl = uv_hl; -export hl; +import iotask = uv_iotask; +export iotask; import global_loop = uv_global_loop; export global_loop; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index a350967802f..806b14c717d 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -5,8 +5,9 @@ A process-wide libuv event loop for library use. export get, get_monitor_task_gl; import ll = uv_ll; -import hl = uv_hl; +import iotask = uv_iotask; import get_gl = get; +import iotask::{iotask, spawn_iotask}; import priv::{chan_from_global_ptr, weaken_task}; import comm::{port, chan, methods, select2, listen}; import either::{left, right}; @@ -27,12 +28,12 @@ loop that this function returns. * A `hl::high_level_loop` that encapsulates communication with the global loop. "] -fn get() -> hl::high_level_loop { +fn get() -> iotask { ret get_monitor_task_gl(); } #[doc(hidden)] -fn get_monitor_task_gl() -> hl::high_level_loop unsafe { +fn get_monitor_task_gl() -> iotask unsafe { let monitor_loop_chan_ptr = rustrt::rust_uv_get_kernel_global_chan_ptr(); @@ -53,7 +54,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe { }; #debug("before priv::chan_from_global_ptr"); - type monchan = chan<hl::high_level_loop>; + type monchan = chan<iotask>; let monitor_ch = chan_from_global_ptr::<monchan>(monitor_loop_chan_ptr, builder_fn) {|msg_po| @@ -70,7 +71,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe { // all normal tasks have ended, tell the // libuv loop to tear_down, then exit #debug("weak_exit_po recv'd msg: %?", weak_exit); - hl::exit(hl_loop); + iotask::exit(hl_loop); break; } right(fetch_ch) { @@ -92,7 +93,7 @@ fn get_monitor_task_gl() -> hl::high_level_loop unsafe { } } -fn spawn_loop() -> hl::high_level_loop unsafe { +fn spawn_loop() -> iotask unsafe { let builder = task::builder(); task::add_wrapper(builder) {|task_body| fn~(move task_body) { @@ -110,7 +111,7 @@ fn spawn_loop() -> hl::high_level_loop unsafe { } } } - hl::spawn_high_level_loop(builder) + spawn_iotask(builder) } #[cfg(test)] @@ -128,7 +129,7 @@ mod test { log(debug, "in simple timer cb"); ll::timer_stop(timer_ptr); let hl_loop = get_gl(); - hl::interact(hl_loop) {|_loop_ptr| + iotask::interact(hl_loop) {|_loop_ptr| log(debug, "closing timer"); ll::close(timer_ptr, simple_timer_close_cb); log(debug, "about to deref exit_ch_ptr"); @@ -137,7 +138,7 @@ mod test { log(debug, "exiting simple timer cb"); } - fn impl_uv_hl_simple_timer(hl_loop: hl::high_level_loop) unsafe { + fn impl_uv_hl_simple_timer(iotask: iotask) unsafe { let exit_po = comm::port::<bool>(); let exit_ch = comm::chan(exit_po); let exit_ch_ptr = ptr::addr_of(exit_ch); @@ -145,7 +146,7 @@ mod test { exit_ch_ptr)); let timer_handle = ll::timer_t(); let timer_ptr = ptr::addr_of(timer_handle); - hl::interact(hl_loop) {|loop_ptr| + iotask::interact(iotask) {|loop_ptr| log(debug, "user code inside interact loop!!!"); let init_status = ll::timer_init(loop_ptr, timer_ptr); if(init_status == 0i32) { diff --git a/src/libstd/uv_hl.rs b/src/libstd/uv_iotask.rs index e873f2ed16f..df27f325494 100644 --- a/src/libstd/uv_hl.rs +++ b/src/libstd/uv_iotask.rs @@ -1,13 +1,14 @@ #[doc = " -High-level bindings to work with the libuv library. -This module is geared towards library developers who want to -provide a high-level, abstracted interface to some set of -libuv functionality. +A task-based interface to the uv loop + +The I/O task runs in its own single-threaded scheduler. By using the +`interact` function you can execute code in a uv callback. + "]; -export high_level_loop; -export spawn_high_level_loop; +export iotask::{}; +export spawn_iotask; export interact; export exit; @@ -19,18 +20,19 @@ import ll = uv_ll; #[doc = " Used to abstract-away direct interaction with a libuv loop. "] -enum high_level_loop = { - async_handle: *ll::uv_async_t, - op_chan: chan<high_level_msg> -}; +enum iotask { + iotask_({ + async_handle: *ll::uv_async_t, + op_chan: chan<iotask_msg> + }) +} -fn spawn_high_level_loop(-builder: task::builder - ) -> high_level_loop unsafe { +fn spawn_iotask(-builder: task::builder) -> iotask { import task::{set_opts, get_opts, single_threaded, run}; - let hll_po = port::<high_level_loop>(); - let hll_ch = hll_po.chan(); + let iotask_po = port::<iotask>(); + let iotask_ch = iotask_po.chan(); set_opts(builder, { sched: some({ @@ -42,45 +44,75 @@ fn spawn_high_level_loop(-builder: task::builder run(builder) {|| #debug("entering libuv task"); - run_high_level_loop(hll_ch); + run_loop(iotask_ch); #debug("libuv task exiting"); }; - hll_po.recv() + iotask_po.recv() } -#[doc=" -Represents the range of interactions with a `high_level_loop` + +#[doc = " +Provide a callback to be processed by `iotask` + +The primary way to do operations again a running `iotask` that +doesn't involve creating a uv handle via `safe_handle` + +# Warning + +This function is the only safe way to interact with _any_ `iotask`. +Using functions in the `uv::ll` module outside of the `cb` passed into +this function is _very dangerous_. + +# Arguments + +* iotask - a uv I/O task that you want to do operations against +* cb - a function callback to be processed on the running loop's +thread. The only parameter passed in is an opaque pointer representing the +running `uv_loop_t*`. In the context of this callback, it is safe to use +this pointer to do various uv_* API calls contained within the `uv::ll` +module. It is not safe to send the `loop_ptr` param to this callback out +via ports/chans. "] -enum high_level_msg { - interaction (fn~(*libc::c_void)), - #[doc=" -For use in libraries that roll their own `high_level_loop` (like -`std::uv::global_loop`) +unsafe fn interact(iotask: iotask, + -cb: fn~(*c_void)) { + send_msg(iotask, interaction(cb)); +} + +#[doc=" +Shut down the I/O task Is used to signal to the loop that it should close the internally-held async handle and do a sanity check to make sure that all other handles are -closed, causing a failure otherwise. This should not be sent/used from -'normal' user code. - "] +closed, causing a failure otherwise. +"] +fn exit(iotask: iotask) unsafe { + send_msg(iotask, teardown_loop); +} + + +// INTERNAL API + +enum iotask_msg { + interaction (fn~(*libc::c_void)), teardown_loop } #[doc = " -Useful for anyone who wants to roll their own `high_level_loop`. +Run the loop and begin handling messages "] -unsafe fn run_high_level_loop(hll_ch: chan<high_level_loop>) { - let msg_po = port::<high_level_msg>(); +fn run_loop(iotask_ch: chan<iotask>) unsafe { + let msg_po = port::<iotask_msg>(); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task // communication with this loop let async = ll::async_t(); let async_handle = addr_of(async); // associate the async handle with the loop - ll::async_init(loop_ptr, async_handle, high_level_wake_up_cb); + ll::async_init(loop_ptr, async_handle, wake_up_cb); // initialize our loop data and store it in the loop - let data: hl_loop_data = { + let data: iotask_loop_data = { async_handle: async_handle, mut active: true, msg_po_ptr: addr_of(msg_po) @@ -89,63 +121,30 @@ unsafe fn run_high_level_loop(hll_ch: chan<high_level_loop>) { // Send out a handle through which folks can talk to us // while we dwell in the I/O loop - let hll = high_level_loop({ + let iotask = iotask_({ async_handle: async_handle, op_chan: msg_po.chan() }); - hll_ch.send(hll); + iotask_ch.send(iotask); - log(debug, "about to run high level loop"); + log(debug, "about to run uv loop"); // enter the loop... this blocks until the loop is done.. ll::run(loop_ptr); - log(debug, "high-level loop ended"); + log(debug, "uv loop ended"); ll::loop_delete(loop_ptr); } -#[doc = " -Provide a callback to be processed by `a_loop` - -The primary way to do operations again a running `high_level_loop` that -doesn't involve creating a uv handle via `safe_handle` - -# Warning - -This function is the only safe way to interact with _any_ `high_level_loop`. -Using functions in the `uv::ll` module outside of the `cb` passed into -this function is _very dangerous_. - -# Arguments - -* hl_loop - a `uv::hl::high_level_loop` that you want to do operations against -* cb - a function callback to be processed on the running loop's -thread. The only parameter passed in is an opaque pointer representing the -running `uv_loop_t*`. In the context of this callback, it is safe to use -this pointer to do various uv_* API calls contained within the `uv::ll` -module. It is not safe to send the `loop_ptr` param to this callback out -via ports/chans. -"] -unsafe fn interact(hl_loop: high_level_loop, - -cb: fn~(*c_void)) { - send_high_level_msg(hl_loop, interaction(cb)); -} - -fn exit(hl_loop: high_level_loop) unsafe { - send_high_level_msg(hl_loop, teardown_loop); -} - -// INTERNAL API - // data that lives for the lifetime of the high-evel oo -type hl_loop_data = { +type iotask_loop_data = { async_handle: *ll::uv_async_t, mut active: bool, - msg_po_ptr: *port<high_level_msg> + msg_po_ptr: *port<iotask_msg> }; -unsafe fn send_high_level_msg(hl_loop: high_level_loop, - -msg: high_level_msg) { - comm::send(hl_loop.op_chan, msg); - ll::async_send(hl_loop.async_handle); +fn send_msg(iotask: iotask, + -msg: iotask_msg) unsafe { + iotask.op_chan.send(msg); + ll::async_send(iotask.async_handle); } // this will be invoked by a call to uv::hl::interact() with @@ -153,12 +152,12 @@ unsafe fn send_high_level_msg(hl_loop: high_level_loop, // simply check if the loop is active and, if so, invoke the // user-supplied on_wake callback that is stored in the loop's // data member -crust fn high_level_wake_up_cb(async_handle: *ll::uv_async_t, - status: int) unsafe { - log(debug, #fmt("high_level_wake_up_cb crust.. handle: %? status: %?", +crust fn wake_up_cb(async_handle: *ll::uv_async_t, + status: int) unsafe { + log(debug, #fmt("wake_up_cb crust.. handle: %? status: %?", async_handle, status)); let loop_ptr = ll::get_loop_for_uv_handle(async_handle); - let data = ll::get_data_for_uv_handle(async_handle) as *hl_loop_data; + let data = ll::get_data_for_uv_handle(async_handle) as *iotask_loop_data; // FIXME: What is this checking? if (*data).active { let msg_po = *((*data).msg_po_ptr); @@ -190,8 +189,8 @@ crust fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe { assert loop_refs == 1i32; } -fn begin_teardown(data: *hl_loop_data) unsafe { - log(debug, "high_level_tear_down() called, close async_handle"); +fn begin_teardown(data: *iotask_loop_data) unsafe { + log(debug, "iotask begin_teardown() called, close async_handle"); // call user-suppled before_tear_down cb let async_handle = (*data).async_handle; ll::close(async_handle as *c_void, tear_down_close_cb); @@ -211,20 +210,20 @@ mod test { ll::close(handle, async_close_cb); } type ah_data = { - hl_loop: high_level_loop, + iotask: iotask, exit_ch: comm::chan<()> }; - fn impl_uv_hl_async(hl_loop: high_level_loop) unsafe { + fn impl_uv_iotask_async(iotask: iotask) unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(async_handle); let exit_po = comm::port::<()>(); let exit_ch = comm::chan(exit_po); let ah_data = { - hl_loop: hl_loop, + iotask: iotask, exit_ch: exit_ch }; let ah_data_ptr = ptr::addr_of(ah_data); - interact(hl_loop) {|loop_ptr| + interact(iotask) {|loop_ptr| ll::async_init(loop_ptr, ah_ptr, async_handle_cb); ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); @@ -234,14 +233,14 @@ mod test { // this fn documents the bear minimum neccesary to roll your own // high_level_loop - unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> high_level_loop { - let hl_loop_port = comm::port::<high_level_loop>(); - let hl_loop_ch = comm::chan(hl_loop_port); + unsafe fn spawn_test_loop(exit_ch: comm::chan<()>) -> iotask { + let iotask_port = comm::port::<iotask>(); + let iotask_ch = comm::chan(iotask_port); task::spawn_sched(task::manual_threads(1u)) {|| - run_high_level_loop(hl_loop_ch); + run_loop(iotask_ch); exit_ch.send(()); }; - ret comm::recv(hl_loop_port); + ret comm::recv(iotask_port); } crust fn lifetime_handle_close(handle: *libc::c_void) unsafe { @@ -255,10 +254,10 @@ mod test { } #[test] - fn test_uv_hl_async() unsafe { + fn test_uv_iotask_async() unsafe { let exit_po = comm::port::<()>(); let exit_ch = comm::chan(exit_po); - let hl_loop = spawn_test_loop(exit_ch); + let iotask = spawn_test_loop(exit_ch); // using this handle to manage the lifetime of the high_level_loop, // as it will exit the first time one of the impl_uv_hl_async() is @@ -270,7 +269,7 @@ mod test { let work_exit_ch = comm::chan(work_exit_po); iter::repeat(7u) {|| task::spawn_sched(task::manual_threads(1u), {|| - impl_uv_hl_async(hl_loop); + impl_uv_iotask_async(iotask); comm::send(work_exit_ch, ()); }); }; @@ -278,13 +277,7 @@ mod test { comm::recv(work_exit_po); }; log(debug, "sending teardown_loop msg.."); - // the teardown msg usually comes, in the case of the global loop, - // as a result of receiving a msg on the weaken_task port. but, - // anyone rolling their own high_level_loop can decide when to - // send the msg. it's assert and barf, though, if all of your - // handles aren't uv_close'd first - comm::send(hl_loop.op_chan, teardown_loop); - ll::async_send(hl_loop.async_handle); + exit(iotask); comm::recv(exit_po); log(debug, "after recv on exit_po.. exiting.."); } |
