diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-01-26 14:57:58 -0800 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-01-26 14:57:58 -0800 |
| commit | 83ca034d2ed3acf3e9ae3075964763129ab51c23 (patch) | |
| tree | 99784c6f3ab0d7519abc590a630cc1ffacd14a19 /src/libstd | |
| parent | 2372d2c6a81986973ad53d22380ff42d0ef69246 (diff) | |
| parent | 1ef83945c1d76c9f2b9b0d087ceac65963087be7 (diff) | |
| download | rust-83ca034d2ed3acf3e9ae3075964763129ab51c23.tar.gz rust-83ca034d2ed3acf3e9ae3075964763129ab51c23.zip | |
Merge remote-tracking branch 'brson/nocommupstream2'
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/flatpipes.rs | 3 | ||||
| -rw-r--r-- | src/libstd/net_ip.rs | 6 | ||||
| -rw-r--r-- | src/libstd/net_tcp.rs | 712 | ||||
| -rw-r--r-- | src/libstd/timer.rs | 21 | ||||
| -rw-r--r-- | src/libstd/uv_global_loop.rs | 205 | ||||
| -rw-r--r-- | src/libstd/uv_iotask.rs | 114 |
6 files changed, 558 insertions, 503 deletions
diff --git a/src/libstd/flatpipes.rs b/src/libstd/flatpipes.rs index ea7b2442bb9..afc3e72e636 100644 --- a/src/libstd/flatpipes.rs +++ b/src/libstd/flatpipes.rs @@ -792,7 +792,6 @@ mod test { let (finish_port, finish_chan) = pipes::stream(); let addr = ip::v4::parse_addr("127.0.0.1"); - let iotask = uv::global_loop::get(); let begin_connect_chan = Cell(move begin_connect_chan); let accept_chan = Cell(move accept_chan); @@ -800,6 +799,7 @@ mod test { // The server task do task::spawn |copy addr, move begin_connect_chan, move accept_chan| { + let iotask = &uv::global_loop::get(); let begin_connect_chan = begin_connect_chan.take(); let accept_chan = accept_chan.take(); let listen_res = do tcp::listen( @@ -831,6 +831,7 @@ mod test { begin_connect_port.recv(); debug!("connecting"); + let iotask = &uv::global_loop::get(); let connect_result = tcp::connect(copy addr, port, iotask); assert connect_result.is_ok(); let sock = result::unwrap(move connect_result); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 84c3b755649..72e58cbd5d3 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -114,7 +114,7 @@ enum IpGetAddrErr { * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure */ -pub fn get_addr(node: &str, iotask: iotask) +pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { do oldcomm::listen |output_ch| { do str::as_buf(node) |node_ptr, len| { @@ -419,7 +419,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr() { let localhost_name = ~"localhost"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); if result::is_err(&ga_result) { fail ~"got err result from net::ip::get_addr();" @@ -445,7 +445,7 @@ mod test { #[ignore(reason = "valgrind says it's leaky")] fn test_ip_get_addr_bad_input() { let localhost_name = ~"sjkl234m,./sdf"; - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let ga_result = get_addr(localhost_name, iotask); assert result::is_err(&ga_result); } diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 3e9a9756a81..24396eebbbe 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -143,7 +143,7 @@ pub enum TcpConnectErrData { * `net::tcp::tcp_connect_err_data` instance will be returned */ pub fn connect(input_ip: ip::IpAddr, port: uint, - iotask: IoTask) + iotask: &IoTask) -> result::Result<TcpSocket, TcpConnectErrData> { unsafe { let result_po = oldcomm::Port::<ConnAttempt>(); @@ -166,106 +166,116 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, ip::Ipv4(_) => { false } ip::Ipv6(_) => { true } }, - iotask: iotask + iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); // get an unsafe representation of our stream_handle_ptr that // we can send into the interact cb to be handled in libuv.. log(debug, fmt!("stream_handle_ptr outside interact %?", - stream_handle_ptr)); + stream_handle_ptr)); do iotask::interact(iotask) |move input_ip, loop_ptr| { unsafe { log(debug, ~"in interact cb for tcp client connect.."); log(debug, fmt!("stream_handle_ptr in interact %?", - stream_handle_ptr)); + stream_handle_ptr)); match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) { - 0i32 => { - log(debug, ~"tcp_init successful"); - log(debug, ~"dealing w/ ipv4 connection.."); - let connect_req_ptr = - ptr::addr_of(&((*socket_data_ptr).connect_req)); - let addr_str = ip::format_addr(&input_ip); - let connect_result = match input_ip { - ip::Ipv4(ref addr) => { - // have to "recreate" the sockaddr_in/6 - // since the ip_addr discards the port - // info.. should probably add an additional - // rust type that actually is closer to - // what the libuv API expects (ip str + port num) - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip4_addr(addr_str, port as int); - uv::ll::tcp_connect( - connect_req_ptr, - stream_handle_ptr, - ptr::addr_of(&in_addr), - tcp_connect_on_connect_cb) - } - ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip6_addr(addr_str, port as int); - uv::ll::tcp_connect6( - connect_req_ptr, - stream_handle_ptr, - ptr::addr_of(&in_addr), - tcp_connect_on_connect_cb) - } - }; - match connect_result { - 0i32 => { - log(debug, ~"tcp_connect successful"); - // reusable data that we'll have for the - // duration.. - uv::ll::set_data_for_uv_handle(stream_handle_ptr, - socket_data_ptr as - *libc::c_void); - // just so the connect_cb can send the - // outcome.. - uv::ll::set_data_for_req(connect_req_ptr, - conn_data_ptr); - log(debug, ~"leaving tcp_connect interact cb..."); - // let tcp_connect_on_connect_cb send on - // the result_ch, now.. - } - _ => { - // immediate connect failure.. probably a garbage - // ip or somesuch + 0i32 => { + log(debug, ~"tcp_init successful"); + log(debug, ~"dealing w/ ipv4 connection.."); + let connect_req_ptr = + ptr::addr_of(&((*socket_data_ptr).connect_req)); + let addr_str = ip::format_addr(&input_ip); + let connect_result = match input_ip { + ip::Ipv4(ref addr) => { + // have to "recreate" the + // sockaddr_in/6 since the ip_addr + // discards the port info.. should + // probably add an additional rust + // type that actually is closer to + // what the libuv API expects (ip str + // + port num) + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip4_addr(addr_str, + port as int); + uv::ll::tcp_connect( + connect_req_ptr, + stream_handle_ptr, + ptr::addr_of(&in_addr), + tcp_connect_on_connect_cb) + } + ip::Ipv6(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip6_addr(addr_str, + port as int); + uv::ll::tcp_connect6( + connect_req_ptr, + stream_handle_ptr, + ptr::addr_of(&in_addr), + tcp_connect_on_connect_cb) + } + }; + match connect_result { + 0i32 => { + log(debug, ~"tcp_connect successful"); + // reusable data that we'll have for the + // duration.. + uv::ll::set_data_for_uv_handle( + stream_handle_ptr, + socket_data_ptr as + *libc::c_void); + // just so the connect_cb can send the + // outcome.. + uv::ll::set_data_for_req(connect_req_ptr, + conn_data_ptr); + log(debug, + ~"leaving tcp_connect interact cb..."); + // let tcp_connect_on_connect_cb send on + // the result_ch, now.. + } + _ => { + // immediate connect + // failure.. probably a garbage ip or + // somesuch + let err_data = + uv::ll::get_last_err_data(loop_ptr); + oldcomm::send((*conn_data_ptr).result_ch, + ConnFailure(err_data)); + uv::ll::set_data_for_uv_handle( + stream_handle_ptr, + conn_data_ptr); + uv::ll::close(stream_handle_ptr, + stream_error_close_cb); + } + } + } + _ => { + // failure to create a tcp handle let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); - uv::ll::set_data_for_uv_handle(stream_handle_ptr, - conn_data_ptr); - uv::ll::close(stream_handle_ptr, - stream_error_close_cb); - } + ConnFailure(err_data)); } - } - _ => { - // failure to create a tcp handle - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); - } } } - }; + } match oldcomm::recv(result_po) { - ConnSuccess => { - log(debug, ~"tcp::connect - received success on result_po"); - result::Ok(TcpSocket(socket_data)) - } - ConnFailure(ref err_data) => { - oldcomm::recv(closed_signal_po); - log(debug, ~"tcp::connect - received failure on result_po"); - // still have to free the malloc'd stream handle.. - rustrt::rust_uv_current_kernel_free(stream_handle_ptr - as *libc::c_void); - let tcp_conn_err = match err_data.err_name { - ~"ECONNREFUSED" => ConnectionRefused, - _ => GenericConnectErr(err_data.err_name, err_data.err_msg) - }; - result::Err(tcp_conn_err) - } + ConnSuccess => { + log(debug, ~"tcp::connect - received success on result_po"); + result::Ok(TcpSocket(socket_data)) + } + ConnFailure(ref err_data) => { + oldcomm::recv(closed_signal_po); + log(debug, ~"tcp::connect - received failure on result_po"); + // still have to free the malloc'd stream handle.. + rustrt::rust_uv_current_kernel_free(stream_handle_ptr + as *libc::c_void); + let tcp_conn_err = match err_data.err_name { + ~"ECONNREFUSED" => ConnectionRefused, + _ => GenericConnectErr(err_data.err_name, + err_data.err_msg) + }; + result::Err(tcp_conn_err) + } } } } @@ -506,71 +516,79 @@ fn read_future(sock: &TcpSocket, timeout_msecs: uint) pub fn accept(new_conn: TcpNewConnection) -> result::Result<TcpSocket, TcpErrData> { unsafe { - match new_conn { - NewTcpConn(server_handle_ptr) => { - let server_data_ptr = uv::ll::get_data_for_uv_handle( - server_handle_ptr) as *TcpListenFcData; - let reader_po = oldcomm::Port(); - let iotask = (*server_data_ptr).iotask; - let stream_handle_ptr = malloc_uv_tcp_t(); - *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); - let client_socket_data = @TcpSocketData { - reader_po: reader_po, - reader_ch: oldcomm::Chan(&reader_po), - stream_handle_ptr : stream_handle_ptr, - connect_req : uv::ll::connect_t(), - write_req : uv::ll::write_t(), - ipv6: (*server_data_ptr).ipv6, - iotask : iotask - }; - let client_socket_data_ptr = ptr::addr_of(&(*client_socket_data)); - let client_stream_handle_ptr = - (*client_socket_data_ptr).stream_handle_ptr; - - let result_po = oldcomm::Port::<Option<TcpErrData>>(); - let result_ch = oldcomm::Chan(&result_po); - - // UNSAFE LIBUV INTERACTION BEGIN - // .. normally this happens within the context of - // a call to uv::hl::interact.. but we're breaking - // the rules here because this always has to be - // called within the context of a listen() new_connect_cb - // callback (or it will likely fail and drown your cat) - log(debug, ~"in interact cb for tcp::accept"); - let loop_ptr = uv::ll::get_loop_for_uv_handle( - server_handle_ptr); - match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { - 0i32 => { - log(debug, ~"uv_tcp_init successful for client stream"); - match uv::ll::accept( - server_handle_ptr as *libc::c_void, - client_stream_handle_ptr as *libc::c_void) { - 0i32 => { - log(debug, ~"successfully accepted client connection"); - uv::ll::set_data_for_uv_handle(client_stream_handle_ptr, - client_socket_data_ptr - as *libc::c_void); - oldcomm::send(result_ch, None); - } - _ => { - log(debug, ~"failed to accept client conn"); - oldcomm::send(result_ch, Some( - uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); - } + match new_conn{ + NewTcpConn(server_handle_ptr) => { + let server_data_ptr = uv::ll::get_data_for_uv_handle( + server_handle_ptr) as *TcpListenFcData; + let reader_po = oldcomm::Port(); + let iotask = &(*server_data_ptr).iotask; + let stream_handle_ptr = malloc_uv_tcp_t(); + *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = + uv::ll::tcp_t(); + let client_socket_data: @TcpSocketData = @TcpSocketData { + reader_po: reader_po, + reader_ch: oldcomm::Chan(&reader_po), + stream_handle_ptr : stream_handle_ptr, + connect_req : uv::ll::connect_t(), + write_req : uv::ll::write_t(), + ipv6: (*server_data_ptr).ipv6, + iotask : iotask.clone() + }; + let client_socket_data_ptr = ptr::addr_of( + &(*client_socket_data)); + let client_stream_handle_ptr = + (*client_socket_data_ptr).stream_handle_ptr; + + let result_po = oldcomm::Port::<Option<TcpErrData>>(); + let result_ch = oldcomm::Chan(&result_po); + + // UNSAFE LIBUV INTERACTION BEGIN + // .. normally this happens within the context of + // a call to uv::hl::interact.. but we're breaking + // the rules here because this always has to be + // called within the context of a listen() new_connect_cb + // callback (or it will likely fail and drown your cat) + log(debug, ~"in interact cb for tcp::accept"); + let loop_ptr = uv::ll::get_loop_for_uv_handle( + server_handle_ptr); + match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { + 0i32 => { + log(debug, ~"uv_tcp_init successful for \ + client stream"); + match uv::ll::accept( + server_handle_ptr as *libc::c_void, + client_stream_handle_ptr as *libc::c_void) { + 0i32 => { + log(debug, + ~"successfully accepted client \ + connection"); + uv::ll::set_data_for_uv_handle( + client_stream_handle_ptr, + client_socket_data_ptr + as *libc::c_void); + oldcomm::send(result_ch, None); + } + _ => { + log(debug, ~"failed to accept client conn"); + oldcomm::send(result_ch, Some( + uv::ll::get_last_err_data( + loop_ptr).to_tcp_err())); + } + } + } + _ => { + log(debug, ~"failed to accept client stream"); + oldcomm::send(result_ch, Some( + uv::ll::get_last_err_data( + loop_ptr).to_tcp_err())); + } + } + // UNSAFE LIBUV INTERACTION END + match oldcomm::recv(result_po) { + Some(copy err_data) => result::Err(err_data), + None => result::Ok(TcpSocket(client_socket_data)) } - } - _ => { - log(debug, ~"failed to init client stream"); - oldcomm::send(result_ch, Some( - uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); - } - } - // UNSAFE LIBUV INTERACTION END - match oldcomm::recv(result_po) { - Some(copy err_data) => result::Err(err_data), - None => result::Ok(TcpSocket(client_socket_data)) } - } } } } @@ -604,30 +622,27 @@ pub fn accept(new_conn: TcpNewConnection) * of listen exiting because of an error */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, - on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), - new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan<Option<TcpErrData>>)) + iotask: &IoTask, + on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), + new_connect_cb: fn~(TcpNewConnection, + oldcomm::Chan<Option<TcpErrData>>)) -> result::Result<(), TcpListenErrData> { - unsafe { - do listen_common(move host_ip, port, backlog, iotask, - move on_establish_cb) - // on_connect_cb - |move new_connect_cb, handle| { - unsafe { - let server_data_ptr = - uv::ll::get_data_for_uv_handle(handle) - as *TcpListenFcData; - let new_conn = NewTcpConn(handle); - let kill_ch = (*server_data_ptr).kill_ch; - new_connect_cb(new_conn, kill_ch); - } - } + do listen_common(move host_ip, port, backlog, iotask, + move on_establish_cb) + // on_connect_cb + |move new_connect_cb, handle| { + unsafe { + let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) + as *TcpListenFcData; + let new_conn = NewTcpConn(handle); + let kill_ch = (*server_data_ptr).kill_ch; + new_connect_cb(new_conn, kill_ch); + } } } fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, - iotask: IoTask, + iotask: &IoTask, on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> { @@ -637,12 +652,12 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let kill_ch = oldcomm::Chan(&kill_po); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); - let server_data = { + let server_data: TcpListenFcData = TcpListenFcData { server_stream_ptr: server_stream_ptr, stream_closed_ch: oldcomm::Chan(&stream_closed_po), kill_ch: kill_ch, on_connect_cb: move on_connect_cb, - iotask: iotask, + iotask: iotask.clone(), ipv6: match &host_ip { &ip::Ipv4(_) => { false } &ip::Ipv6(_) => { true } @@ -662,114 +677,123 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, do iotask::interact(iotask) |move loc_ip, loop_ptr| { unsafe { match uv::ll::tcp_init(loop_ptr, server_stream_ptr) { - 0i32 => { - uv::ll::set_data_for_uv_handle( - server_stream_ptr, - server_data_ptr); - let addr_str = ip::format_addr(&loc_ip); - let bind_result = match loc_ip { - ip::Ipv4(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip4_addr(addr_str, - port as int); - uv::ll::tcp_bind(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip6_addr(addr_str, - port as int); - uv::ll::tcp_bind6(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - }; - match bind_result { - 0i32 => { - match uv::ll::listen(server_stream_ptr, - backlog as libc::c_int, - tcp_lfc_on_connection_cb) { - 0i32 => oldcomm::send(setup_ch, None), - _ => { - log(debug, ~"failure to uv_listen()"); - let err_data = uv::ll::get_last_err_data( - loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } + 0i32 => { + uv::ll::set_data_for_uv_handle( + server_stream_ptr, + server_data_ptr); + let addr_str = ip::format_addr(&loc_ip); + let bind_result = match loc_ip { + ip::Ipv4(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip4_addr( + addr_str, + port as int); + uv::ll::tcp_bind(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + ip::Ipv6(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip6_addr( + addr_str, + port as int); + uv::ll::tcp_bind6(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + }; + match bind_result { + 0i32 => { + match uv::ll::listen( + server_stream_ptr, + backlog as libc::c_int, + tcp_lfc_on_connection_cb) { + 0i32 => oldcomm::send(setup_ch, None), + _ => { + log(debug, + ~"failure to uv_tcp_init"); + let err_data = + uv::ll::get_last_err_data( + loop_ptr); + oldcomm::send(setup_ch, + Some(err_data)); + } + } + } + _ => { + log(debug, ~"failure to uv_tcp_bind"); + let err_data = uv::ll::get_last_err_data( + loop_ptr); + oldcomm::send(setup_ch, Some(err_data)); + } } - } - _ => { + } + _ => { log(debug, ~"failure to uv_tcp_bind"); let err_data = uv::ll::get_last_err_data( loop_ptr); oldcomm::send(setup_ch, Some(err_data)); - } } - } - _ => { - log(debug, ~"failure to uv_tcp_init"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } } - }; + } } setup_ch.recv() }; match setup_result { - Some(ref err_data) => { - do iotask::interact(iotask) |loop_ptr| { - unsafe { - log(debug, - fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); - (*server_data_ptr).active = false; - uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + Some(ref err_data) => { + do iotask::interact(iotask) |loop_ptr| { + unsafe { + log(debug, + fmt!("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + } + }; + stream_closed_po.recv(); + match err_data.err_name { + ~"EACCES" => { + log(debug, ~"Got EACCES error"); + result::Err(AccessDenied) + } + ~"EADDRINUSE" => { + log(debug, ~"Got EADDRINUSE error"); + result::Err(AddressInUse) + } + _ => { + log(debug, fmt!("Got '%s' '%s' libuv error", + err_data.err_name, err_data.err_msg)); + result::Err( + GenericListenErr(err_data.err_name, + err_data.err_msg)) + } } - }; - stream_closed_po.recv(); - match err_data.err_name { - ~"EACCES" => { - log(debug, ~"Got EACCES error"); - result::Err(AccessDenied) - } - ~"EADDRINUSE" => { - log(debug, ~"Got EADDRINUSE error"); - result::Err(AddressInUse) - } - _ => { - log(debug, fmt!("Got '%s' '%s' libuv error", - err_data.err_name, err_data.err_msg)); - result::Err( - GenericListenErr(err_data.err_name, err_data.err_msg)) - } } - } - None => { - on_establish_cb(kill_ch); - let kill_result = oldcomm::recv(kill_po); - do iotask::interact(iotask) |loop_ptr| { - unsafe { - log(debug, - fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); - (*server_data_ptr).active = false; - uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + None => { + on_establish_cb(kill_ch); + let kill_result = oldcomm::recv(kill_po); + do iotask::interact(iotask) |loop_ptr| { + unsafe { + log(debug, + fmt!("tcp::listen post-kill recv hl interact %?", + loop_ptr)); + (*server_data_ptr).active = false; + uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); + } + }; + stream_closed_po.recv(); + match kill_result { + // some failure post bind/listen + Some(ref err_data) => result::Err(GenericListenErr( + err_data.err_name, + err_data.err_msg)), + // clean exit + None => result::Ok(()) } - }; - stream_closed_po.recv(); - match kill_result { - // some failure post bind/listen - Some(ref err_data) => result::Err(GenericListenErr( - err_data.err_name, - err_data.err_msg)), - // clean exit - None => result::Ok(()) } - } } } } + /** * Convert a `net::tcp::tcp_socket` to a `net::tcp::tcp_socket_buf`. * @@ -936,11 +960,11 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) { }; let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", - stream_handle_ptr, loop_ptr)); + stream_handle_ptr, loop_ptr)); uv::ll::set_data_for_uv_handle(stream_handle_ptr, close_data_ptr); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); @@ -950,7 +974,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) { //the line below will most likely crash //log(debug, fmt!("about to free socket_data at %?", socket_data)); rustrt::rust_uv_current_kernel_free(stream_handle_ptr - as *libc::c_void); + as *libc::c_void); log(debug, ~"exiting dtor for tcp_socket"); } } @@ -962,7 +986,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) use timer; log(debug, ~"starting tcp::read"); - let iotask = (*socket_data).iotask; + let iotask = &(*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { let err_data = result::get_err(&rs_result); @@ -972,26 +996,26 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) log(debug, ~"tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( - iotask, timeout_msecs, result::get(&rs_result)) + iotask, timeout_msecs, result::get(&rs_result)) } else { Some(oldcomm::recv(result::get(&rs_result))) }; log(debug, ~"tcp::read after recv_timeout"); match move read_result { - None => { - log(debug, ~"tcp::read: timed out.."); - let err_data = TcpErrData { - err_name: ~"TIMEOUT", - err_msg: ~"req timed out" - }; - read_stop_common_impl(socket_data); - result::Err(err_data) - } - Some(move data_result) => { - log(debug, ~"tcp::read got data"); - read_stop_common_impl(socket_data); - data_result - } + None => { + log(debug, ~"tcp::read: timed out.."); + let err_data = TcpErrData { + err_name: ~"TIMEOUT", + err_msg: ~"req timed out" + }; + read_stop_common_impl(socket_data); + result::Err(err_data) + } + Some(move data_result) => { + log(debug, ~"tcp::read got data"); + read_stop_common_impl(socket_data); + data_result + } } } } @@ -1004,27 +1028,26 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stream_handle_ptr = (*socket_data).stream_handle_ptr; let stop_po = oldcomm::Port::<Option<TcpErrData>>(); let stop_ch = oldcomm::Chan(&stop_po); - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { log(debug, ~"in interact cb for tcp::read_stop"); - match uv::ll::read_stop(stream_handle_ptr as - *uv::ll::uv_stream_t) { - 0i32 => { - log(debug, ~"successfully called uv_read_stop"); - oldcomm::send(stop_ch, None); - } - _ => { - log(debug, ~"failure in calling uv_read_stop"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); - } + match uv::ll::read_stop(stream_handle_ptr + as *uv::ll::uv_stream_t) { + 0i32 => { + log(debug, ~"successfully called uv_read_stop"); + oldcomm::send(stop_ch, None); + } + _ => { + log(debug, ~"failure in calling uv_read_stop"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); + } } } - }; - + } match oldcomm::recv(stop_po) { - Some(move err_data) => Err(err_data), - None => Ok(()) + Some(move err_data) => Err(err_data), + None => Ok(()) } } } @@ -1038,29 +1061,29 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>(); let start_ch = oldcomm::Chan(&start_po); log(debug, ~"in tcp::read_start before interact loop"); - do iotask::interact((*socket_data).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { - log(debug, - fmt!("in tcp::read_start interact cb %?", loop_ptr)); - match uv::ll::read_start(stream_handle_ptr as - *uv::ll::uv_stream_t, + log(debug, fmt!("in tcp::read_start interact cb %?", + loop_ptr)); + match uv::ll::read_start(stream_handle_ptr + as *uv::ll::uv_stream_t, on_alloc_cb, on_tcp_read_cb) { - 0i32 => { - log(debug, ~"success doing uv_read_start"); - oldcomm::send(start_ch, None); - } - _ => { - log(debug, ~"error attempting uv_read_start"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(start_ch, Some(err_data)); - } + 0i32 => { + log(debug, ~"success doing uv_read_start"); + oldcomm::send(start_ch, None); + } + _ => { + log(debug, ~"error attempting uv_read_start"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send(start_ch, Some(err_data)); + } } } - }; + } match oldcomm::recv(start_po) { - Some(ref err_data) => result::Err(err_data.to_tcp_err()), - None => result::Ok((*socket_data).reader_po) + Some(ref err_data) => result::Err(err_data.to_tcp_err()), + None => result::Ok((*socket_data).reader_po) } } } @@ -1084,27 +1107,28 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, result_ch: oldcomm::Chan(&result_po) }; let write_data_ptr = ptr::addr_of(&write_data); - do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| { + do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| { unsafe { log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); match uv::ll::write(write_req_ptr, - stream_handle_ptr, - write_buf_vec_ptr, - tcp_write_complete_cb) { - 0i32 => { - log(debug, ~"uv_write() invoked successfully"); - uv::ll::set_data_for_req(write_req_ptr, write_data_ptr); - } - _ => { - log(debug, ~"error invoking uv_write()"); - let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*write_data_ptr).result_ch, - TcpWriteError(err_data.to_tcp_err())); - } + stream_handle_ptr, + write_buf_vec_ptr, + tcp_write_complete_cb) { + 0i32 => { + log(debug, ~"uv_write() invoked successfully"); + uv::ll::set_data_for_req(write_req_ptr, + write_data_ptr); + } + _ => { + log(debug, ~"error invoking uv_write()"); + let err_data = uv::ll::get_last_err_data(loop_ptr); + oldcomm::send((*write_data_ptr).result_ch, + TcpWriteError(err_data.to_tcp_err())); + } } } - }; + } // FIXME (#2656): Instead of passing unsafe pointers to local data, // and waiting here for the write to complete, we should transfer // ownership of everything to the I/O task and let it deal with the @@ -1473,7 +1497,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_and_client() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8888u; let expected_req = ~"ping"; @@ -1485,6 +1509,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1493,7 +1518,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1519,7 +1544,7 @@ pub mod test { assert str::contains(actual_resp, expected_resp); } pub fn impl_gl_tcp_ipv4_get_peer_addr() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8887u; let expected_resp = ~"pong"; @@ -1530,6 +1555,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1538,7 +1564,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1549,10 +1575,11 @@ pub mod test { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); let connect_result = connect(move server_ip_addr, server_port, - iotask); + &iotask); let sock = result::unwrap(move connect_result); + debug!("testing peer address"); // This is what we are actually testing! assert net::ip::format_addr(&sock.get_peer_addr()) == ~"127.0.0.1"; @@ -1561,12 +1588,14 @@ pub mod test { // Fulfill the protocol the test server expects let resp_bytes = str::to_bytes(~"ping"); tcp_write_single(&sock, resp_bytes); + debug!("message sent"); let read_result = sock.read(0u); client_ch.send(str::from_bytes(read_result.get())); + debug!("result read"); }; } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8889u; let expected_req = ~"ping"; @@ -1586,7 +1615,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_address_in_use() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8890u; let expected_req = ~"ping"; @@ -1598,6 +1627,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1606,7 +1636,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1637,7 +1667,7 @@ pub mod test { } } pub fn impl_gl_tcp_ipv4_server_access_denied() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 80u; // this one should fail.. @@ -1657,7 +1687,7 @@ pub mod test { } pub fn impl_gl_tcp_ipv4_server_client_reader_writer() { - let iotask = uv::global_loop::get(); + let iotask = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 8891u; let expected_req = ~"ping"; @@ -1669,6 +1699,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1677,7 +1708,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - iotask) + &iotask_clone) }; server_result_ch.send(actual_req); }; @@ -1708,7 +1739,7 @@ pub mod test { pub fn impl_tcp_socket_impl_reader_handles_eof() { use core::io::{Reader,ReaderUtil}; - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let server_ip = ~"127.0.0.1"; let server_port = 10041u; let expected_req = ~"GET /"; @@ -1720,6 +1751,7 @@ pub mod test { let cont_po = oldcomm::Port::<()>(); let cont_ch = oldcomm::Chan(&cont_po); // server + let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { let actual_req = do oldcomm::listen |server_ch| { run_tcp_test_server( @@ -1728,7 +1760,7 @@ pub mod test { expected_resp, server_ch, cont_ch, - hl_loop) + &hl_loop_clone) }; server_result_ch.send(actual_req); }; @@ -1768,7 +1800,7 @@ pub mod test { fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, server_ch: oldcomm::Chan<~str>, cont_ch: oldcomm::Chan<()>, - iotask: IoTask) -> ~str { + iotask: &IoTask) -> ~str { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1855,7 +1887,7 @@ pub mod test { } fn run_tcp_test_server_fail(server_ip: &str, server_port: uint, - iotask: IoTask) -> TcpListenErrData { + iotask: &IoTask) -> TcpListenErrData { let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, @@ -1879,7 +1911,7 @@ pub mod test { fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, client_ch: oldcomm::Chan<~str>, - iotask: IoTask) -> result::Result<~str, + iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index d0ca133b39e..c06d56165c8 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -39,7 +39,7 @@ use core; * * ch - a channel of type T to send a `val` on * * val - a value of type T to send over the provided `ch` */ -pub fn delayed_send<T: Owned>(iotask: IoTask, +pub fn delayed_send<T: Owned>(iotask: &IoTask, msecs: uint, ch: oldcomm::Chan<T>, val: T) { @@ -92,7 +92,7 @@ pub fn delayed_send<T: Owned>(iotask: IoTask, * * `iotask` - a `uv::iotask` that the tcp request will run on * * msecs - an amount of time, in milliseconds, for the current task to block */ -pub fn sleep(iotask: IoTask, msecs: uint) { +pub fn sleep(iotask: &IoTask, msecs: uint) { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); delayed_send(iotask, msecs, exit_ch, ()); @@ -119,7 +119,7 @@ pub fn sleep(iotask: IoTask, msecs: uint) { * on the provided port in the allotted timeout period, then the result will * be a `Some(T)`. If not, then `None` will be returned. */ -pub fn recv_timeout<T: Copy Owned>(iotask: IoTask, +pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask, msecs: uint, wait_po: oldcomm::Port<T>) -> Option<T> { @@ -183,13 +183,13 @@ mod test { #[test] fn test_gl_timer_simple_sleep_test() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); sleep(hl_loop, 1u); } #[test] fn test_gl_timer_sleep_stress1() { - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); for iter::repeat(50u) { sleep(hl_loop, 1u); } @@ -199,7 +199,7 @@ mod test { fn test_gl_timer_sleep_stress2() { let po = oldcomm::Port(); let ch = oldcomm::Chan(&po); - let hl_loop = uv::global_loop::get(); + let hl_loop = &uv::global_loop::get(); let repeat = 20u; let spec = { @@ -214,11 +214,12 @@ mod test { for spec.each |spec| { let (times, maxms) = *spec; + let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; let rng = Rng(); for iter::repeat(times) { - sleep(hl_loop, rng.next() as uint % maxms); + sleep(&hl_loop_clone, rng.next() as uint % maxms); } oldcomm::send(ch, ()); } @@ -277,12 +278,12 @@ mod test { let expected = rand::Rng().gen_str(16u); let test_po = oldcomm::Port::<~str>(); let test_ch = oldcomm::Chan(&test_po); - + let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(hl_loop, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, test_ch, expected); }; - match recv_timeout(hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 3a2c3b7c135..8ae3e24abee 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -19,16 +19,16 @@ use uv_iotask::{IoTask, spawn_iotask}; use core::either::{Left, Right}; use core::libc; -use core::oldcomm::{Port, Chan, select2, listen}; -use core::private::{chan_from_global_ptr, weaken_task}; +use core::pipes::{Port, Chan, SharedChan, select2i}; +use core::private::global::{global_data_clone_create, + global_data_clone}; +use core::private::weak_task::weaken_task; use core::str; -use core::task::TaskBuilder; +use core::task::{task, SingleThreaded, spawn}; use core::task; use core::vec; - -extern mod rustrt { - unsafe fn rust_uv_get_kernel_global_chan_ptr() -> *libc::uintptr_t; -} +use core::clone::Clone; +use core::option::{Some, None}; /** * Race-free helper to get access to a global task where a libuv @@ -48,69 +48,64 @@ pub fn get() -> IoTask { #[doc(hidden)] fn get_monitor_task_gl() -> IoTask { - unsafe { - let monitor_loop_chan_ptr = - rustrt::rust_uv_get_kernel_global_chan_ptr(); - - debug!("ENTERING global_loop::get() loop chan: %?", - monitor_loop_chan_ptr); - - debug!("before priv::chan_from_global_ptr"); - type MonChan = Chan<IoTask>; - - let monitor_ch = - do chan_from_global_ptr::<MonChan>(monitor_loop_chan_ptr, - || { - task::task().sched_mode - (task::SingleThreaded) - .unlinked() - }) |msg_po| { - unsafe { - debug!("global monitor task starting"); - - // As a weak task the runtime will notify us when to exit - do weaken_task() |weak_exit_po| { - debug!("global monitor task is now weak"); - let hl_loop = spawn_loop(); - loop { - debug!("in outer_loop..."); - match select2(weak_exit_po, msg_po) { - Left(weak_exit) => { - // all normal tasks have ended, tell the - // libuv loop to tear_down, then exit - debug!("weak_exit_po recv'd msg: %?", weak_exit); - iotask::exit(hl_loop); - break; - } - Right(fetch_ch) => { - debug!("hl_loop req recv'd: %?", fetch_ch); - fetch_ch.send(hl_loop); - } - } + + type MonChan = Chan<IoTask>; + + struct GlobalIoTask(IoTask); + + impl GlobalIoTask: Clone { + fn clone(&self) -> GlobalIoTask { + GlobalIoTask((**self).clone()) + } + } + + fn key(_: GlobalIoTask) { } + + match unsafe { global_data_clone(key) } { + Some(GlobalIoTask(iotask)) => iotask, + None => { + let iotask: IoTask = spawn_loop(); + let mut installed = false; + let final_iotask = unsafe { + do global_data_clone_create(key) { + installed = true; + ~GlobalIoTask(iotask.clone()) + } + }; + if installed { + do task().unlinked().spawn() { + unsafe { + debug!("global monitor task starting"); + // As a weak task the runtime will notify us + // when to exit + do weaken_task |weak_exit_po| { + debug!("global monitor task is weak"); + weak_exit_po.recv(); + iotask::exit(&iotask); + debug!("global monitor task is unweak"); + }; + debug!("global monitor task exiting"); } - debug!("global monitor task is leaving weakend state"); - }; - debug!("global monitor task exiting"); + } + } else { + iotask::exit(&iotask); } - }; - // once we have a chan to the monitor loop, we ask it for - // the libuv loop's async handle - do listen |fetch_ch| { - monitor_ch.send(fetch_ch); - fetch_ch.recv() + match final_iotask { + GlobalIoTask(iotask) => iotask + } } } } fn spawn_loop() -> IoTask { - let builder = do task::task().add_wrapper |task_body| { + let builder = do task().add_wrapper |task_body| { fn~(move task_body) { // The I/O loop task also needs to be weak so it doesn't keep // the runtime alive unsafe { - do weaken_task |weak_exit_po| { - debug!("global libuv task is now weak %?", weak_exit_po); + do weaken_task |_| { + debug!("global libuv task is now weak"); task_body(); // We don't wait for the exit message on weak_exit_po @@ -122,6 +117,7 @@ fn spawn_loop() -> IoTask { } } }; + let builder = builder.unlinked(); spawn_iotask(move builder) } @@ -135,16 +131,18 @@ mod test { use core::iter; use core::libc; - use core::oldcomm; use core::ptr; use core::task; + use core::cast::transmute; + use core::libc::c_void; + use core::pipes::{stream, SharedChan, Chan}; extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) { unsafe { let exit_ch_ptr = ll::get_data_for_uv_handle( - timer_ptr as *libc::c_void) as *oldcomm::Chan<bool>; - let exit_ch = *exit_ch_ptr; - oldcomm::send(exit_ch, true); + timer_ptr as *libc::c_void); + let exit_ch = transmute::<*c_void, ~Chan<bool>>(exit_ch_ptr); + exit_ch.send(true); log(debug, fmt!("EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?", exit_ch_ptr)); @@ -155,26 +153,25 @@ mod test { unsafe { log(debug, ~"in simple timer cb"); ll::timer_stop(timer_ptr); - let hl_loop = get_gl(); + let hl_loop = &get_gl(); do iotask::interact(hl_loop) |_loop_ptr| { + log(debug, ~"closing timer"); unsafe { - log(debug, ~"closing timer"); ll::close(timer_ptr, simple_timer_close_cb); - log(debug, ~"about to deref exit_ch_ptr"); - log(debug, ~"after msg sent on deref'd exit_ch"); } + log(debug, ~"about to deref exit_ch_ptr"); + log(debug, ~"after msg sent on deref'd exit_ch"); }; log(debug, ~"exiting simple timer cb"); } } - fn impl_uv_hl_simple_timer(iotask: IoTask) { + fn impl_uv_hl_simple_timer(iotask: &IoTask) { unsafe { - let exit_po = oldcomm::Port::<bool>(); - let exit_ch = oldcomm::Chan(&exit_po); - let exit_ch_ptr = ptr::addr_of(&exit_ch); + let (exit_po, exit_ch) = stream::<bool>(); + let exit_ch_ptr: *libc::c_void = transmute(~exit_ch); log(debug, fmt!("EXIT_CH_PTR newly created exit_ch_ptr: %?", - exit_ch_ptr)); + exit_ch_ptr)); let timer_handle = ll::timer_t(); let timer_ptr = ptr::addr_of(&timer_handle); do iotask::interact(iotask) |loop_ptr| { @@ -184,20 +181,22 @@ mod test { if(init_status == 0i32) { ll::set_data_for_uv_handle( timer_ptr as *libc::c_void, - exit_ch_ptr as *libc::c_void); + exit_ch_ptr); let start_status = ll::timer_start(timer_ptr, simple_timer_cb, - 1u, - 0u); - if start_status != 0 { + 1u, 0u); + if(start_status == 0i32) { + } + else { fail ~"failure on ll::timer_start()"; } - } else { + } + else { fail ~"failure on ll::timer_init()"; } } }; - oldcomm::recv(exit_po); + exit_po.recv(); log(debug, ~"global_loop timer test: msg recv on exit_po, done.."); } @@ -205,17 +204,15 @@ mod test { #[test] fn test_gl_uv_global_loop_high_level_global_timer() { - unsafe { - let hl_loop = get_gl(); - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - task::spawn_sched(task::ManualThreads(1u), || { - impl_uv_hl_simple_timer(hl_loop); - oldcomm::send(exit_ch, ()); - }); + let hl_loop = &get_gl(); + let (exit_po, exit_ch) = stream::<()>(); + task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); impl_uv_hl_simple_timer(hl_loop); - oldcomm::recv(exit_po); - } + exit_ch.send(()); + }); + impl_uv_hl_simple_timer(hl_loop); + exit_po.recv(); } // keeping this test ignored until some kind of stress-test-harness @@ -223,23 +220,21 @@ mod test { #[test] #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() { - unsafe { - let hl_loop = get_gl(); - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - let cycles = 5000u; - for iter::repeat(cycles) { - task::spawn_sched(task::ManualThreads(1u), || { - impl_uv_hl_simple_timer(hl_loop); - oldcomm::send(exit_ch, ()); - }); - }; - for iter::repeat(cycles) { - oldcomm::recv(exit_po); - }; - log(debug, - ~"test_stress_gl_uv_global_loop_high_level_global_timer"+ - ~" exiting sucessfully!"); - } + let (exit_po, exit_ch) = stream::<()>(); + let exit_ch = SharedChan(exit_ch); + let cycles = 5000u; + for iter::repeat(cycles) { + let exit_ch_clone = exit_ch.clone(); + task::spawn_sched(task::ManualThreads(1u), || { + let hl_loop = &get_gl(); + impl_uv_hl_simple_timer(hl_loop); + exit_ch_clone.send(()); + }); + }; + for iter::repeat(cycles) { + exit_po.recv(); + }; + log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+ + ~" exiting sucessfully!"); } } diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 0a3d64a02a4..dc0092aadfa 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -20,7 +20,7 @@ use ll = uv_ll; use core::libc::c_void; use core::libc; -use core::oldcomm::{Port, Chan, listen}; +use core::pipes::{stream, Port, Chan, SharedChan}; use core::prelude::*; use core::ptr::addr_of; use core::task::TaskBuilder; @@ -30,22 +30,30 @@ use core::task; pub enum IoTask { IoTask_({ async_handle: *ll::uv_async_t, - op_chan: Chan<IoTaskMsg> + op_chan: SharedChan<IoTaskMsg> }) } +impl IoTask: Clone { + fn clone(&self) -> IoTask { + IoTask_({ + async_handle: self.async_handle, + op_chan: self.op_chan.clone() + }) + } +} + pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { - do listen |iotask_ch| { + let (iotask_port, iotask_chan) = stream(); - do task.sched_mode(task::SingleThreaded).spawn { - debug!("entering libuv task"); - run_loop(iotask_ch); - debug!("libuv task exiting"); - }; + do task.sched_mode(task::SingleThreaded).spawn { + debug!("entering libuv task"); + run_loop(&iotask_chan); + debug!("libuv task exiting"); + }; - iotask_ch.recv() - } + iotask_port.recv() } @@ -71,7 +79,7 @@ pub fn spawn_iotask(task: task::TaskBuilder) -> IoTask { * module. It is not safe to send the `loop_ptr` param to this callback out * via ports/chans. */ -pub unsafe fn interact(iotask: IoTask, +pub unsafe fn interact(iotask: &IoTask, cb: fn~(*c_void)) { send_msg(iotask, Interaction(move cb)); } @@ -83,7 +91,7 @@ pub unsafe fn interact(iotask: IoTask, * async handle and do a sanity check to make sure that all other handles are * closed, causing a failure otherwise. */ -pub fn exit(iotask: IoTask) { +pub fn exit(iotask: &IoTask) { unsafe { send_msg(iotask, TeardownLoop); } @@ -98,8 +106,10 @@ enum IoTaskMsg { } /// Run the loop and begin handling messages -fn run_loop(iotask_ch: Chan<IoTask>) { +fn run_loop(iotask_ch: &Chan<IoTask>) { + unsafe { + debug!("creating loop"); let loop_ptr = ll::loop_new(); // set up the special async handle we'll use to allow multi-task @@ -110,10 +120,12 @@ fn run_loop(iotask_ch: Chan<IoTask>) { // associate the async handle with the loop ll::async_init(loop_ptr, async_handle, wake_up_cb); + let (msg_po, msg_ch) = stream::<IoTaskMsg>(); + // initialize our loop data and store it in the loop - let data = IoTaskLoopData { + let data: IoTaskLoopData = IoTaskLoopData { async_handle: async_handle, - msg_po: Port() + msg_po: msg_po }; ll::set_data_for_uv_handle(async_handle, addr_of(&data)); @@ -121,7 +133,7 @@ fn run_loop(iotask_ch: Chan<IoTask>) { // while we dwell in the I/O loop let iotask = IoTask_({ async_handle: async_handle, - op_chan: data.msg_po.chan() + op_chan: SharedChan(msg_ch) }); iotask_ch.send(iotask); @@ -139,9 +151,10 @@ struct IoTaskLoopData { msg_po: Port<IoTaskMsg>, } -fn send_msg(iotask: IoTask, msg: IoTaskMsg) { +fn send_msg(iotask: &IoTask, + msg: IoTaskMsg) { + iotask.op_chan.send(move msg); unsafe { - iotask.op_chan.send(move msg); ll::async_send(iotask.async_handle); } } @@ -149,19 +162,20 @@ fn send_msg(iotask: IoTask, msg: IoTaskMsg) { /// Dispatch all pending messages extern fn wake_up_cb(async_handle: *ll::uv_async_t, status: int) { - unsafe { - log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?", - async_handle, status)); + log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?", + async_handle, status)); + + unsafe { let loop_ptr = ll::get_loop_for_uv_handle(async_handle); - let data = ll::get_data_for_uv_handle(async_handle) - as *IoTaskLoopData; - let msg_po = (*data).msg_po; + let data = + ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData; + let msg_po = &(*data).msg_po; while msg_po.peek() { match msg_po.recv() { - Interaction(ref cb) => (*cb)(loop_ptr), - TeardownLoop => begin_teardown(data) + Interaction(ref cb) => (*cb)(loop_ptr), + TeardownLoop => begin_teardown(data) } } } @@ -216,27 +230,32 @@ mod test { } struct AhData { iotask: IoTask, - exit_ch: oldcomm::Chan<()>, + exit_ch: oldcomm::Chan<()> } - fn impl_uv_iotask_async(iotask: IoTask) { + fn impl_uv_iotask_async(iotask: &IoTask) { unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let ah_data = { - iotask: iotask, + let ah_data = AhData { + iotask: iotask.clone(), exit_ch: exit_ch }; - let ah_data_ptr = ptr::addr_of(&ah_data); + let ah_data_ptr: *AhData = unsafe { + ptr::to_unsafe_ptr(&ah_data) + }; + debug!("about to interact"); do interact(iotask) |loop_ptr| { unsafe { + debug!("interacting"); ll::async_init(loop_ptr, ah_ptr, async_handle_cb); - ll::set_data_for_uv_handle(ah_ptr, - ah_data_ptr as *libc::c_void); + ll::set_data_for_uv_handle( + ah_ptr, ah_data_ptr as *libc::c_void); ll::async_send(ah_ptr); } }; + debug!("waiting for async close"); oldcomm::recv(exit_po); } } @@ -244,13 +263,13 @@ mod test { // this fn documents the bear minimum neccesary to roll your own // high_level_loop unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { - let iotask_port = oldcomm::Port::<IoTask>(); - let iotask_ch = oldcomm::Chan(&iotask_port); + let (iotask_port, iotask_ch) = stream::<IoTask>(); do task::spawn_sched(task::ManualThreads(1u)) { - run_loop(iotask_ch); + debug!("about to run a test loop"); + run_loop(&iotask_ch); exit_ch.send(()); }; - return oldcomm::recv(iotask_port); + return iotask_port.recv(); } extern fn lifetime_handle_close(handle: *libc::c_void) { @@ -270,23 +289,30 @@ mod test { unsafe { let exit_po = oldcomm::Port::<()>(); let exit_ch = oldcomm::Chan(&exit_po); - let iotask = spawn_test_loop(exit_ch); + let iotask = &spawn_test_loop(exit_ch); + + debug!("spawned iotask"); // using this handle to manage the lifetime of the - // high_level_loop, as it will exit the first time one of the - // impl_uv_hl_async() is cleaned up with no one ref'd handles on - // the loop (Which can happen under race-condition type - // situations.. this ensures that the loop lives until, at least, - // all of the impl_uv_hl_async() runs have been called, at least. + // high_level_loop, as it will exit the first time one of + // the impl_uv_hl_async() is cleaned up with no one ref'd + // handles on the loop (Which can happen under + // race-condition type situations.. this ensures that the + // loop lives until, at least, all of the + // impl_uv_hl_async() runs have been called, at least. let work_exit_po = oldcomm::Port::<()>(); let work_exit_ch = oldcomm::Chan(&work_exit_po); for iter::repeat(7u) { + let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - impl_uv_iotask_async(iotask); + debug!("async"); + impl_uv_iotask_async(&iotask_clone); + debug!("done async"); oldcomm::send(work_exit_ch, ()); }; }; for iter::repeat(7u) { + debug!("waiting"); oldcomm::recv(work_exit_po); }; log(debug, ~"sending teardown_loop msg.."); |
