diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-01-25 00:52:50 -0800 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-01-29 19:54:55 -0800 |
| commit | da4b3768971c7c025ba8a85ebf59572fd752dfb6 (patch) | |
| tree | e47abf6553bffeaaa35d682f74f0cc42df5e913e /src | |
| parent | 87acde8826af4dfd8391cbccc48526381796dab3 (diff) | |
| download | rust-da4b3768971c7c025ba8a85ebf59572fd752dfb6.tar.gz rust-da4b3768971c7c025ba8a85ebf59572fd752dfb6.zip | |
std: Stop using oldcomm
Diffstat (limited to 'src')
| -rw-r--r-- | src/libstd/arc.rs | 1 | ||||
| -rw-r--r-- | src/libstd/c_vec.rs | 1 | ||||
| -rw-r--r-- | src/libstd/net_ip.rs | 71 | ||||
| -rw-r--r-- | src/libstd/net_tcp.rs | 579 | ||||
| -rw-r--r-- | src/libstd/test.rs | 42 | ||||
| -rw-r--r-- | src/libstd/timer.rs | 91 | ||||
| -rw-r--r-- | src/libstd/uv_iotask.rs | 34 | ||||
| -rw-r--r-- | src/libstd/uv_ll.rs | 59 |
8 files changed, 429 insertions, 449 deletions
diff --git a/src/libstd/arc.rs b/src/libstd/arc.rs index a45e2b32941..e50245168b1 100644 --- a/src/libstd/arc.rs +++ b/src/libstd/arc.rs @@ -483,7 +483,6 @@ mod tests { use arc::*; use arc; - use core::oldcomm::*; use core::option::{Some, None}; use core::option; use core::pipes; diff --git a/src/libstd/c_vec.rs b/src/libstd/c_vec.rs index 359d3039229..c190d08687a 100644 --- a/src/libstd/c_vec.rs +++ b/src/libstd/c_vec.rs @@ -38,7 +38,6 @@ #[forbid(deprecated_mode)]; use core::libc; -use core::oldcomm; use core::option; use core::prelude::*; use core::ptr; diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index 72e58cbd5d3..839d0d23a61 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -12,8 +12,8 @@ #[forbid(deprecated_mode)]; use core::libc; -use core::oldcomm; use core::prelude::*; +use core::pipes::{stream, SharedChan}; use core::ptr; use core::result; use core::str; @@ -113,40 +113,40 @@ enum IpGetAddrErr { * A `result<~[ip_addr], ip_get_addr_err>` instance that will contain * a vector of `ip_addr` results, in the case of success, or an error * object in the case of failure - */ +*/ pub fn get_addr(node: &str, iotask: &iotask) - -> result::Result<~[IpAddr], IpGetAddrErr> { - do oldcomm::listen |output_ch| { - do str::as_buf(node) |node_ptr, len| { - unsafe { - log(debug, fmt!("slice len %?", len)); - let handle = create_uv_getaddrinfo_t(); - let handle_ptr = ptr::addr_of(&handle); - let handle_data = GetAddrData { - output_ch: output_ch - }; - let handle_data_ptr = ptr::addr_of(&handle_data); - do interact(iotask) |loop_ptr| { - unsafe { - let result = uv_getaddrinfo( - loop_ptr, - handle_ptr, - get_addr_cb, - node_ptr, - ptr::null(), - ptr::null()); - match result { - 0i32 => { + -> result::Result<~[IpAddr], IpGetAddrErr> { + let (output_po, output_ch) = stream(); + let output_ch = SharedChan(output_ch); + do str::as_buf(node) |node_ptr, len| { + unsafe { + log(debug, fmt!("slice len %?", len)); + let handle = create_uv_getaddrinfo_t(); + let handle_ptr = ptr::addr_of(&handle); + let handle_data = GetAddrData { + output_ch: output_ch.clone() + }; + let handle_data_ptr = ptr::addr_of(&handle_data); + do interact(iotask) |loop_ptr| { + unsafe { + let result = uv_getaddrinfo( + loop_ptr, + handle_ptr, + get_addr_cb, + node_ptr, + ptr::null(), + ptr::null()); + match result { + 0i32 => { set_data_for_req(handle_ptr, handle_data_ptr); - } - _ => { + } + _ => { output_ch.send(result::Err(GetAddrUnknownError)); - } } } - }; - output_ch.recv() - } + } + }; + output_po.recv() } } } @@ -300,7 +300,7 @@ pub mod v6 { } struct GetAddrData { - output_ch: oldcomm::Chan<result::Result<~[IpAddr],IpGetAddrErr>> + output_ch: SharedChan<result::Result<~[IpAddr],IpGetAddrErr>> } extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int, @@ -309,6 +309,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int, log(debug, ~"in get_addr_cb"); let handle_data = get_data_for_req(handle) as *GetAddrData; + let output_ch = (*handle_data).output_ch.clone(); if status == 0i32 { if res != (ptr::null::<addrinfo>()) { let mut out_vec = ~[]; @@ -326,7 +327,7 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int, else { log(debug, ~"curr_addr is not of family AF_INET or "+ ~"AF_INET6. Error."); - (*handle_data).output_ch.send( + output_ch.send( result::Err(GetAddrUnknownError)); break; }; @@ -344,17 +345,17 @@ extern fn get_addr_cb(handle: *uv_getaddrinfo_t, status: libc::c_int, } log(debug, fmt!("successful process addrinfo result, len: %?", vec::len(out_vec))); - (*handle_data).output_ch.send(result::Ok(move out_vec)); + output_ch.send(result::Ok(move out_vec)); } else { log(debug, ~"addrinfo pointer is NULL"); - (*handle_data).output_ch.send( + output_ch.send( result::Err(GetAddrUnknownError)); } } else { log(debug, ~"status != 0 error in get_addr_cb"); - (*handle_data).output_ch.send( + output_ch.send( result::Err(GetAddrUnknownError)); } if res != (ptr::null::<addrinfo>()) { diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 608723a6ca5..d9e4bfc540c 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -23,7 +23,7 @@ use core::io::{Reader, ReaderUtil, Writer}; use core::io; use core::libc::size_t; use core::libc; -use core::oldcomm; +use core::pipes::{stream, Chan, Port, SharedChan}; use core::prelude::*; use core::ptr; use core::result::{Result}; @@ -146,19 +146,22 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, iotask: &IoTask) -> result::Result<TcpSocket, TcpConnectErrData> { unsafe { - let result_po = oldcomm::Port::<ConnAttempt>(); - let closed_signal_po = oldcomm::Port::<()>(); - let conn_data = { - result_ch: oldcomm::Chan(&result_po), - closed_signal_ch: oldcomm::Chan(&closed_signal_po) + let (result_po, result_ch) = stream::<ConnAttempt>(); + let result_ch = SharedChan(result_ch); + let (closed_signal_po, closed_signal_ch) = stream::<()>(); + let closed_signal_ch = SharedChan(closed_signal_ch); + let conn_data = ConnectReqData { + result_ch: result_ch, + closed_signal_ch: closed_signal_ch }; let conn_data_ptr = ptr::addr_of(&conn_data); - let reader_po = oldcomm::Port::<result::Result<~[u8], TcpErrData>>(); + let (reader_po, reader_ch) = stream::<Result<~[u8], TcpErrData>>(); + let reader_ch = SharedChan(reader_ch); let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let socket_data = @TcpSocketData { - reader_po: reader_po, - reader_ch: oldcomm::Chan(&reader_po), + reader_po: @reader_po, + reader_ch: reader_ch, stream_handle_ptr: stream_handle_ptr, connect_req: uv::ll::connect_t(), write_req: uv::ll::write_t(), @@ -169,7 +172,6 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, iotask: iotask.clone() }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); - debug!("tcp_connect result_ch %?", conn_data.result_ch); // get an unsafe representation of our stream_handle_ptr that // we can send into the interact cb to be handled in libuv.. debug!("stream_handle_ptr outside interact %?", @@ -238,8 +240,9 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, // somesuch let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); + let result_ch = (*conn_data_ptr) + .result_ch.clone(); + result_ch.send(ConnFailure(err_data)); uv::ll::set_data_for_uv_handle( stream_handle_ptr, conn_data_ptr); @@ -251,19 +254,19 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, _ => { // failure to create a tcp handle let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*conn_data_ptr).result_ch, - ConnFailure(err_data)); + let result_ch = (*conn_data_ptr).result_ch.clone(); + result_ch.send(ConnFailure(err_data)); } } } } - match oldcomm::recv(result_po) { + match result_po.recv() { ConnSuccess => { debug!("tcp::connect - received success on result_po"); result::Ok(TcpSocket(socket_data)) } ConnFailure(ref err_data) => { - oldcomm::recv(closed_signal_po); + closed_signal_po.recv(); debug!("tcp::connect - received failure on result_po"); // still have to free the malloc'd stream handle.. rustrt::rust_uv_current_kernel_free(stream_handle_ptr @@ -359,7 +362,7 @@ pub fn write_future(sock: &TcpSocket, raw_write_data: ~[u8]) * `tcp_err_data` record */ pub fn read_start(sock: &TcpSocket) - -> result::Result<oldcomm::Port< + -> result::Result<@Port< result::Result<~[u8], TcpErrData>>, TcpErrData> { unsafe { let socket_data = ptr::addr_of(&(*(sock.socket_data))); @@ -374,12 +377,9 @@ pub fn read_start(sock: &TcpSocket) * * * `sock` - a `net::tcp::tcp_socket` that you wish to stop reading on */ -pub fn read_stop(sock: &TcpSocket, - read_port: oldcomm::Port<result::Result<~[u8], TcpErrData>>) -> +pub fn read_stop(sock: &TcpSocket) -> result::Result<(), TcpErrData> { unsafe { - debug!( - "taking the read_port out of commission %?", read_port); let socket_data = ptr::addr_of(&(*sock.socket_data)); read_stop_common_impl(socket_data) } @@ -519,14 +519,16 @@ pub fn accept(new_conn: TcpNewConnection) NewTcpConn(server_handle_ptr) => { let server_data_ptr = uv::ll::get_data_for_uv_handle( server_handle_ptr) as *TcpListenFcData; - let reader_po = oldcomm::Port(); + let (reader_po, reader_ch) = stream::< + Result<~[u8], TcpErrData>>(); + let reader_ch = SharedChan(reader_ch); let iotask = &(*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let client_socket_data: @TcpSocketData = @TcpSocketData { - reader_po: reader_po, - reader_ch: oldcomm::Chan(&reader_po), + reader_po: @reader_po, + reader_ch: reader_ch, stream_handle_ptr : stream_handle_ptr, connect_req : uv::ll::connect_t(), write_req : uv::ll::write_t(), @@ -538,8 +540,8 @@ pub fn accept(new_conn: TcpNewConnection) let client_stream_handle_ptr = (*client_socket_data_ptr).stream_handle_ptr; - let result_po = oldcomm::Port::<Option<TcpErrData>>(); - let result_ch = oldcomm::Chan(&result_po); + let (result_po, result_ch) = stream::<Option<TcpErrData>>(); + let result_ch = SharedChan(result_ch); // UNSAFE LIBUV INTERACTION BEGIN // .. normally this happens within the context of @@ -565,11 +567,11 @@ pub fn accept(new_conn: TcpNewConnection) client_stream_handle_ptr, client_socket_data_ptr as *libc::c_void); - oldcomm::send(result_ch, None); + result_ch.send(None); } _ => { log(debug, ~"failed to accept client conn"); - oldcomm::send(result_ch, Some( + result_ch.send(Some( uv::ll::get_last_err_data( loop_ptr).to_tcp_err())); } @@ -577,13 +579,13 @@ pub fn accept(new_conn: TcpNewConnection) } _ => { log(debug, ~"failed to accept client stream"); - oldcomm::send(result_ch, Some( + result_ch.send(Some( uv::ll::get_last_err_data( loop_ptr).to_tcp_err())); } } // UNSAFE LIBUV INTERACTION END - match oldcomm::recv(result_po) { + match result_po.recv() { Some(copy err_data) => result::Err(err_data), None => result::Ok(TcpSocket(client_socket_data)) } @@ -622,9 +624,9 @@ pub fn accept(new_conn: TcpNewConnection) */ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, iotask: &IoTask, - on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), + on_establish_cb: fn~(SharedChan<Option<TcpErrData>>), new_connect_cb: fn~(TcpNewConnection, - oldcomm::Chan<Option<TcpErrData>>)) + SharedChan<Option<TcpErrData>>)) -> result::Result<(), TcpListenErrData> { do listen_common(move host_ip, port, backlog, iotask, move on_establish_cb) @@ -634,7 +636,7 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) as *TcpListenFcData; let new_conn = NewTcpConn(handle); - let kill_ch = (*server_data_ptr).kill_ch; + let kill_ch = (*server_data_ptr).kill_ch.clone(); new_connect_cb(new_conn, kill_ch); } } @@ -642,19 +644,20 @@ pub fn listen(host_ip: ip::IpAddr, port: uint, backlog: uint, fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, iotask: &IoTask, - on_establish_cb: fn~(oldcomm::Chan<Option<TcpErrData>>), + on_establish_cb: fn~(SharedChan<Option<TcpErrData>>), on_connect_cb: fn~(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> { unsafe { - let stream_closed_po = oldcomm::Port::<()>(); - let kill_po = oldcomm::Port::<Option<TcpErrData>>(); - let kill_ch = oldcomm::Chan(&kill_po); + let (stream_closed_po, stream_closed_ch) = stream::<()>(); + let stream_closed_ch = SharedChan(stream_closed_ch); + let (kill_po, kill_ch) = stream::<Option<TcpErrData>>(); + let kill_ch = SharedChan(kill_ch); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); let server_data: TcpListenFcData = TcpListenFcData { server_stream_ptr: server_stream_ptr, - stream_closed_ch: oldcomm::Chan(&stream_closed_po), - kill_ch: kill_ch, + stream_closed_ch: stream_closed_ch, + kill_ch: kill_ch.clone(), on_connect_cb: move on_connect_cb, iotask: iotask.clone(), ipv6: match &host_ip { @@ -665,77 +668,78 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, }; let server_data_ptr = ptr::addr_of(&server_data); - let setup_result = do oldcomm::listen |setup_ch| { - // this is to address a compiler warning about - // an implicit copy.. it seems that double nested - // will defeat a move sigil, as is done to the host_ip - // arg above.. this same pattern works w/o complaint in - // tcp::connect (because the iotask::interact cb isn't - // nested within a core::comm::listen block) - let loc_ip = copy(host_ip); - do iotask::interact(iotask) |move loc_ip, loop_ptr| { - unsafe { - match uv::ll::tcp_init(loop_ptr, server_stream_ptr) { - 0i32 => { - uv::ll::set_data_for_uv_handle( - server_stream_ptr, - server_data_ptr); - let addr_str = ip::format_addr(&loc_ip); - let bind_result = match loc_ip { - ip::Ipv4(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip4_addr( - addr_str, - port as int); - uv::ll::tcp_bind(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); - let in_addr = uv::ll::ip6_addr( - addr_str, - port as int); - uv::ll::tcp_bind6(server_stream_ptr, - ptr::addr_of(&in_addr)) - } - }; - match bind_result { - 0i32 => { - match uv::ll::listen( - server_stream_ptr, - backlog as libc::c_int, - tcp_lfc_on_connection_cb) { - 0i32 => oldcomm::send(setup_ch, None), - _ => { - log(debug, - ~"failure to uv_tcp_init"); - let err_data = - uv::ll::get_last_err_data( - loop_ptr); - oldcomm::send(setup_ch, - Some(err_data)); - } + let (setup_po, setup_ch) = stream(); + + // this is to address a compiler warning about + // an implicit copy.. it seems that double nested + // will defeat a move sigil, as is done to the host_ip + // arg above.. this same pattern works w/o complaint in + // tcp::connect (because the iotask::interact cb isn't + // nested within a core::comm::listen block) + let loc_ip = copy(host_ip); + do iotask::interact(iotask) |move loc_ip, loop_ptr| { + unsafe { + match uv::ll::tcp_init(loop_ptr, server_stream_ptr) { + 0i32 => { + uv::ll::set_data_for_uv_handle( + server_stream_ptr, + server_data_ptr); + let addr_str = ip::format_addr(&loc_ip); + let bind_result = match loc_ip { + ip::Ipv4(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip4_addr( + addr_str, + port as int); + uv::ll::tcp_bind(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + ip::Ipv6(ref addr) => { + log(debug, fmt!("addr: %?", addr)); + let in_addr = uv::ll::ip6_addr( + addr_str, + port as int); + uv::ll::tcp_bind6(server_stream_ptr, + ptr::addr_of(&in_addr)) + } + }; + match bind_result { + 0i32 => { + match uv::ll::listen( + server_stream_ptr, + backlog as libc::c_int, + tcp_lfc_on_connection_cb) { + 0i32 => setup_ch.send(None), + _ => { + log(debug, + ~"failure to uv_tcp_init"); + let err_data = + uv::ll::get_last_err_data( + loop_ptr); + setup_ch.send(Some(err_data)); } } - _ => { - log(debug, ~"failure to uv_tcp_bind"); - let err_data = uv::ll::get_last_err_data( - loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } + } + _ => { + log(debug, ~"failure to uv_tcp_bind"); + let err_data = uv::ll::get_last_err_data( + loop_ptr); + setup_ch.send(Some(err_data)); } } - _ => { - log(debug, ~"failure to uv_tcp_bind"); - let err_data = uv::ll::get_last_err_data( - loop_ptr); - oldcomm::send(setup_ch, Some(err_data)); - } + } + _ => { + log(debug, ~"failure to uv_tcp_bind"); + let err_data = uv::ll::get_last_err_data( + loop_ptr); + setup_ch.send(Some(err_data)); } } } - setup_ch.recv() - }; + } + + let setup_result = setup_po.recv(); + match setup_result { Some(ref err_data) => { do iotask::interact(iotask) |loop_ptr| { @@ -767,8 +771,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, } } None => { - on_establish_cb(kill_ch); - let kill_result = oldcomm::recv(kill_po); + on_establish_cb(kill_ch.clone()); + let kill_result = kill_po.recv(); do iotask::interact(iotask) |loop_ptr| { unsafe { log(debug, @@ -816,14 +820,13 @@ pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf { /// Convenience methods extending `net::tcp::tcp_socket` impl TcpSocket { - pub fn read_start() -> result::Result<oldcomm::Port< + pub fn read_start() -> result::Result<@Port< result::Result<~[u8], TcpErrData>>, TcpErrData> { read_start(&self) } - pub fn read_stop(read_port: - oldcomm::Port<result::Result<~[u8], TcpErrData>>) -> + pub fn read_stop() -> result::Result<(), TcpErrData> { - read_stop(&self, move read_port) + read_stop(&self) } fn read(timeout_msecs: uint) -> result::Result<~[u8], TcpErrData> { @@ -995,9 +998,9 @@ impl TcpSocketBuf: io::Writer { fn tear_down_socket_data(socket_data: @TcpSocketData) { unsafe { - let closed_po = oldcomm::Port::<()>(); - let closed_ch = oldcomm::Chan(&closed_po); - let close_data = { + let (closed_po, closed_ch) = stream::<()>(); + let closed_ch = SharedChan(closed_ch); + let close_data = TcpSocketCloseData { closed_ch: closed_ch }; let close_data_ptr = ptr::addr_of(&close_data); @@ -1012,7 +1015,7 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) { uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); } }; - oldcomm::recv(closed_po); + closed_po.recv(); //the line below will most likely crash //log(debug, fmt!("about to free socket_data at %?", socket_data)); rustrt::rust_uv_current_kernel_free(stream_handle_ptr @@ -1038,9 +1041,9 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) log(debug, ~"tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( - iotask, timeout_msecs, result::get(&rs_result)) + iotask, timeout_msecs, result::unwrap(rs_result)) } else { - Some(oldcomm::recv(result::get(&rs_result))) + Some(result::get(&rs_result).recv()) }; log(debug, ~"tcp::read after recv_timeout"); match move read_result { @@ -1068,8 +1071,7 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> result::Result<(), TcpErrData> { unsafe { let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let stop_po = oldcomm::Port::<Option<TcpErrData>>(); - let stop_ch = oldcomm::Chan(&stop_po); + let (stop_po, stop_ch) = stream::<Option<TcpErrData>>(); do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { log(debug, ~"in interact cb for tcp::read_stop"); @@ -1077,17 +1079,17 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> as *uv::ll::uv_stream_t) { 0i32 => { log(debug, ~"successfully called uv_read_stop"); - oldcomm::send(stop_ch, None); + stop_ch.send(None); } _ => { log(debug, ~"failure in calling uv_read_stop"); let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); + stop_ch.send(Some(err_data.to_tcp_err())); } } } } - match oldcomm::recv(stop_po) { + match stop_po.recv() { Some(move err_data) => Err(err_data), None => Ok(()) } @@ -1096,12 +1098,11 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> // shared impl for read_start fn read_start_common_impl(socket_data: *TcpSocketData) - -> result::Result<oldcomm::Port< + -> result::Result<@Port< result::Result<~[u8], TcpErrData>>, TcpErrData> { unsafe { let stream_handle_ptr = (*socket_data).stream_handle_ptr; - let start_po = oldcomm::Port::<Option<uv::ll::uv_err_data>>(); - let start_ch = oldcomm::Chan(&start_po); + let (start_po, start_ch) = stream::<Option<uv::ll::uv_err_data>>(); log(debug, ~"in tcp::read_start before interact loop"); do iotask::interact(&(*socket_data).iotask) |loop_ptr| { unsafe { @@ -1113,19 +1114,22 @@ fn read_start_common_impl(socket_data: *TcpSocketData) on_tcp_read_cb) { 0i32 => { log(debug, ~"success doing uv_read_start"); - oldcomm::send(start_ch, None); + start_ch.send(None); } _ => { log(debug, ~"error attempting uv_read_start"); let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send(start_ch, Some(err_data)); + start_ch.send(Some(err_data)); } } } } - match oldcomm::recv(start_po) { - Some(ref err_data) => result::Err(err_data.to_tcp_err()), - None => result::Ok((*socket_data).reader_po) + match start_po.recv() { + Some(ref err_data) => result::Err( + err_data.to_tcp_err()), + None => { + result::Ok((*socket_data).reader_po) + } } } } @@ -1144,9 +1148,10 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, vec::raw::to_ptr(raw_write_data), vec::len(raw_write_data)) ]; let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec); - let result_po = oldcomm::Port::<TcpWriteResult>(); - let write_data = { - result_ch: oldcomm::Chan(&result_po) + let (result_po, result_ch) = stream::<TcpWriteResult>(); + let result_ch = SharedChan(result_ch); + let write_data = WriteReqData { + result_ch: result_ch }; let write_data_ptr = ptr::addr_of(&write_data); do iotask::interact(&(*socket_data_ptr).iotask) |loop_ptr| { @@ -1165,8 +1170,8 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, _ => { log(debug, ~"error invoking uv_write()"); let err_data = uv::ll::get_last_err_data(loop_ptr); - oldcomm::send((*write_data_ptr).result_ch, - TcpWriteError(err_data.to_tcp_err())); + let result_ch = (*write_data_ptr).result_ch.clone(); + result_ch.send(TcpWriteError(err_data.to_tcp_err())); } } } @@ -1175,7 +1180,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, // and waiting here for the write to complete, we should transfer // ownership of everything to the I/O task and let it deal with the // aftermath, so we don't have to sit here blocking. - match oldcomm::recv(result_po) { + match result_po.recv() { TcpWriteSuccess => Ok(()), TcpWriteError(move err_data) => Err(err_data) } @@ -1188,8 +1193,8 @@ enum TcpNewConnection { struct TcpListenFcData { server_stream_ptr: *uv::ll::uv_tcp_t, - stream_closed_ch: oldcomm::Chan<()>, - kill_ch: oldcomm::Chan<Option<TcpErrData>>, + stream_closed_ch: SharedChan<()>, + kill_ch: SharedChan<Option<TcpErrData>>, on_connect_cb: fn~(*uv::ll::uv_tcp_t), iotask: IoTask, ipv6: bool, @@ -1200,7 +1205,8 @@ extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) { unsafe { let server_data_ptr = uv::ll::get_data_for_uv_handle( handle) as *TcpListenFcData; - oldcomm::send((*server_data_ptr).stream_closed_ch, ()); + let stream_closed_ch = (*server_data_ptr).stream_closed_ch.clone(); + stream_closed_ch.send(()); } } @@ -1209,13 +1215,13 @@ extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t, unsafe { let server_data_ptr = uv::ll::get_data_for_uv_handle(handle) as *TcpListenFcData; - let kill_ch = (*server_data_ptr).kill_ch; + let kill_ch = (*server_data_ptr).kill_ch.clone(); if (*server_data_ptr).active { match status { 0i32 => ((*server_data_ptr).on_connect_cb)(handle), _ => { let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); - oldcomm::send(kill_ch, + kill_ch.send( Some(uv::ll::get_last_err_data(loop_ptr) .to_tcp_err())); (*server_data_ptr).active = false; @@ -1243,7 +1249,7 @@ enum TcpWriteResult { } enum TcpReadStartResult { - TcpReadStartSuccess(oldcomm::Port<TcpReadResult>), + TcpReadStartSuccess(Port<TcpReadResult>), TcpReadStartError(TcpErrData) } @@ -1278,8 +1284,8 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err(); log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?", err_data.err_name, err_data.err_msg)); - let reader_ch = (*socket_data_ptr).reader_ch; - oldcomm::send(reader_ch, result::Err(err_data)); + let reader_ch = &(*socket_data_ptr).reader_ch; + reader_ch.send(result::Err(err_data)); } // do nothing .. unneeded buf 0 => (), @@ -1287,10 +1293,10 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, _ => { // we have data log(debug, fmt!("tcp on_read_cb nread: %d", nread as int)); - let reader_ch = (*socket_data_ptr).reader_ch; + let reader_ch = &(*socket_data_ptr).reader_ch; let buf_base = uv::ll::get_base_from_buf(buf); let new_bytes = vec::from_buf(buf_base, nread as uint); - oldcomm::send(reader_ch, result::Ok(new_bytes)); + reader_ch.send(result::Ok(new_bytes)); } } uv::ll::free_base_of_buf(buf); @@ -1313,15 +1319,15 @@ extern fn on_alloc_cb(handle: *libc::c_void, } struct TcpSocketCloseData { - closed_ch: oldcomm::Chan<()>, + closed_ch: SharedChan<()>, } extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) { unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *TcpSocketCloseData; - let closed_ch = (*data).closed_ch; - oldcomm::send(closed_ch, ()); + let closed_ch = (*data).closed_ch.clone(); + closed_ch.send(()); log(debug, ~"tcp_socket_dtor_close_cb exiting.."); } } @@ -1333,33 +1339,35 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, as *WriteReqData; if status == 0i32 { log(debug, ~"successful write complete"); - oldcomm::send((*write_data_ptr).result_ch, TcpWriteSuccess); + let result_ch = (*write_data_ptr).result_ch.clone(); + result_ch.send(TcpWriteSuccess); } else { let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req( write_req); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); log(debug, ~"failure to write"); - oldcomm::send((*write_data_ptr).result_ch, - TcpWriteError(err_data.to_tcp_err())); + let result_ch = (*write_data_ptr).result_ch.clone(); + result_ch.send(TcpWriteError(err_data.to_tcp_err())); } } } struct WriteReqData { - result_ch: oldcomm::Chan<TcpWriteResult>, + result_ch: SharedChan<TcpWriteResult>, } struct ConnectReqData { - result_ch: oldcomm::Chan<ConnAttempt>, - closed_signal_ch: oldcomm::Chan<()>, + result_ch: SharedChan<ConnAttempt>, + closed_signal_ch: SharedChan<()>, } extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) { unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *ConnectReqData; - oldcomm::send((*data).closed_signal_ch, ()); + let closed_signal_ch = (*data).closed_signal_ch.clone(); + closed_signal_ch.send(()); log(debug, fmt!("exiting steam_error_close_cb for %?", handle)); } } @@ -1375,14 +1383,14 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, unsafe { let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr) as *ConnectReqData); - let result_ch = (*conn_data_ptr).result_ch; + let result_ch = (*conn_data_ptr).result_ch.clone(); log(debug, fmt!("tcp_connect result_ch %?", result_ch)); let tcp_stream_ptr = uv::ll::get_stream_handle_from_connect_req(connect_req_ptr); match status { 0i32 => { log(debug, ~"successful tcp connection!"); - oldcomm::send(result_ch, ConnSuccess); + result_ch.send(ConnSuccess); } _ => { log(debug, ~"error in tcp_connect_on_connect_cb"); @@ -1390,7 +1398,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, let err_data = uv::ll::get_last_err_data(loop_ptr); log(debug, fmt!("err_data %? %?", err_data.err_name, err_data.err_msg)); - oldcomm::send(result_ch, ConnFailure(err_data)); + result_ch.send(ConnFailure(err_data)); uv::ll::set_data_for_uv_handle(tcp_stream_ptr, conn_data_ptr); uv::ll::close(tcp_stream_ptr, stream_error_close_cb); @@ -1406,8 +1414,8 @@ enum ConnAttempt { } struct TcpSocketData { - reader_po: oldcomm::Port<result::Result<~[u8], TcpErrData>>, - reader_ch: oldcomm::Chan<result::Result<~[u8], TcpErrData>>, + reader_po: @Port<result::Result<~[u8], TcpErrData>>, + reader_ch: SharedChan<result::Result<~[u8], TcpErrData>>, stream_handle_ptr: *uv::ll::uv_tcp_t, connect_req: uv::ll::uv_connect_t, write_req: uv::ll::uv_write_t, @@ -1431,7 +1439,7 @@ pub mod test { use uv; use core::io; - use core::oldcomm; + use core::pipes::{stream, Chan, Port, SharedChan}; use core::prelude::*; use core::result; use core::str; @@ -1546,39 +1554,33 @@ pub mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = oldcomm::Port::<~str>(); - let server_result_ch = oldcomm::Chan(&server_result_po); + let (server_result_po, server_result_ch) = stream::<~str>(); - let cont_po = oldcomm::Port::<()>(); - let cont_ch = oldcomm::Chan(&cont_po); + let (cont_po, cont_ch) = stream::<()>(); + let cont_ch = SharedChan(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - let actual_req = do oldcomm::listen |server_ch| { - run_tcp_test_server( - server_ip, - server_port, - expected_resp, - server_ch, - cont_ch, - &hl_loop_clone) - }; + let cont_ch = cont_ch.clone(); + let actual_req = run_tcp_test_server( + server_ip, + server_port, + expected_resp, + cont_ch.clone(), + &hl_loop_clone); server_result_ch.send(actual_req); }; - oldcomm::recv(cont_po); + cont_po.recv(); // client debug!("server started, firing up client.."); - let actual_resp_result = do oldcomm::listen |client_ch| { - run_tcp_test_client( - server_ip, - server_port, - expected_req, - client_ch, - hl_loop) - }; + let actual_resp_result = run_tcp_test_client( + server_ip, + server_port, + expected_req, + hl_loop); assert actual_resp_result.is_ok(); let actual_resp = actual_resp_result.get(); - let actual_req = oldcomm::recv(server_result_po); + let actual_req = server_result_po.recv(); debug!("REQ: expected: '%s' actual: '%s'", expected_req, actual_req); debug!("RESP: expected: '%s' actual: '%s'", @@ -1592,50 +1594,41 @@ pub mod test { let server_port = 8887u; let expected_resp = ~"pong"; - let server_result_po = oldcomm::Port::<~str>(); - let server_result_ch = oldcomm::Chan(&server_result_po); - - let cont_po = oldcomm::Port::<()>(); - let cont_ch = oldcomm::Chan(&cont_po); + let (cont_po, cont_ch) = stream::<()>(); + let cont_ch = SharedChan(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - let actual_req = do oldcomm::listen |server_ch| { - run_tcp_test_server( - server_ip, - server_port, - expected_resp, - server_ch, - cont_ch, - &hl_loop_clone) - }; - server_result_ch.send(actual_req); + let cont_ch = cont_ch.clone(); + run_tcp_test_server( + server_ip, + server_port, + expected_resp, + cont_ch.clone(), + &hl_loop_clone); }; - oldcomm::recv(cont_po); + cont_po.recv(); // client debug!("server started, firing up client.."); - do oldcomm::listen |client_ch| { - let server_ip_addr = ip::v4::parse_addr(server_ip); - let iotask = uv::global_loop::get(); - let connect_result = connect(move server_ip_addr, server_port, - &iotask); - - let sock = result::unwrap(move connect_result); - - debug!("testing peer address"); - // This is what we are actually testing! - assert net::ip::format_addr(&sock.get_peer_addr()) == - ~"127.0.0.1"; - assert net::ip::get_port(&sock.get_peer_addr()) == 8887; - - // Fulfill the protocol the test server expects - let resp_bytes = str::to_bytes(~"ping"); - tcp_write_single(&sock, resp_bytes); - debug!("message sent"); - let read_result = sock.read(0u); - client_ch.send(str::from_bytes(read_result.get())); - debug!("result read"); - }; + let server_ip_addr = ip::v4::parse_addr(server_ip); + let iotask = uv::global_loop::get(); + let connect_result = connect(move server_ip_addr, server_port, + &iotask); + + let sock = result::unwrap(move connect_result); + + debug!("testing peer address"); + // This is what we are actually testing! + assert net::ip::format_addr(&sock.get_peer_addr()) == + ~"127.0.0.1"; + assert net::ip::get_port(&sock.get_peer_addr()) == 8887; + + // Fulfill the protocol the test server expects + let resp_bytes = str::to_bytes(~"ping"); + tcp_write_single(&sock, resp_bytes); + debug!("message sent"); + sock.read(0u); + debug!("result read"); } pub fn impl_gl_tcp_ipv4_client_error_connection_refused() { let hl_loop = &uv::global_loop::get(); @@ -1644,14 +1637,11 @@ pub mod test { let expected_req = ~"ping"; // client debug!("firing up client.."); - let actual_resp_result = do oldcomm::listen |client_ch| { - run_tcp_test_client( - server_ip, - server_port, - expected_req, - client_ch, - hl_loop) - }; + let actual_resp_result = run_tcp_test_client( + server_ip, + server_port, + expected_req, + hl_loop); match actual_resp_result.get_err() { ConnectionRefused => (), _ => fail ~"unknown error.. expected connection_refused" @@ -1664,26 +1654,20 @@ pub mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = oldcomm::Port::<~str>(); - let server_result_ch = oldcomm::Chan(&server_result_po); - - let cont_po = oldcomm::Port::<()>(); - let cont_ch = oldcomm::Chan(&cont_po); + let (cont_po, cont_ch) = stream::<()>(); + let cont_ch = SharedChan(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - let actual_req = do oldcomm::listen |server_ch| { - run_tcp_test_server( - server_ip, - server_port, - expected_resp, - server_ch, - cont_ch, - &hl_loop_clone) - }; - server_result_ch.send(actual_req); - }; - oldcomm::recv(cont_po); + let cont_ch = cont_ch.clone(); + run_tcp_test_server( + server_ip, + server_port, + expected_resp, + cont_ch.clone(), + &hl_loop_clone); + } + cont_po.recv(); // this one should fail.. let listen_err = run_tcp_test_server_fail( server_ip, @@ -1691,14 +1675,11 @@ pub mod test { hl_loop); // client.. just doing this so that the first server tears down debug!("server started, firing up client.."); - do oldcomm::listen |client_ch| { - run_tcp_test_client( - server_ip, - server_port, - expected_req, - client_ch, - hl_loop) - }; + run_tcp_test_client( + server_ip, + server_port, + expected_req, + hl_loop); match listen_err { AddressInUse => { assert true; @@ -1736,26 +1717,23 @@ pub mod test { let expected_req = ~"ping"; let expected_resp = ~"pong"; - let server_result_po = oldcomm::Port::<~str>(); - let server_result_ch = oldcomm::Chan(&server_result_po); + let (server_result_po, server_result_ch) = stream::<~str>(); - let cont_po = oldcomm::Port::<()>(); - let cont_ch = oldcomm::Chan(&cont_po); + let (cont_po, cont_ch) = stream::<()>(); + let cont_ch = SharedChan(cont_ch); // server let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - let actual_req = do oldcomm::listen |server_ch| { - run_tcp_test_server( - server_ip, - server_port, - expected_resp, - server_ch, - cont_ch, - &iotask_clone) - }; + let cont_ch = cont_ch.clone(); + let actual_req = run_tcp_test_server( + server_ip, + server_port, + expected_resp, + cont_ch.clone(), + &iotask_clone); server_result_ch.send(actual_req); }; - oldcomm::recv(cont_po); + cont_po.recv(); // client let server_addr = ip::v4::parse_addr(server_ip); let conn_result = connect(server_addr, server_port, iotask); @@ -1770,7 +1748,7 @@ pub mod test { buf_read(sock_buf, resp_buf.len()) }; - let actual_req = oldcomm::recv(server_result_po); + let actual_req = server_result_po.recv(); log(debug, fmt!("REQ: expected: '%s' actual: '%s'", expected_req, actual_req)); log(debug, fmt!("RESP: expected: '%s' actual: '%s'", @@ -1788,26 +1766,20 @@ pub mod test { let expected_req = ~"GET /"; let expected_resp = ~"A string\nwith multiple lines\n"; - let server_result_po = oldcomm::Port::<~str>(); - let server_result_ch = oldcomm::Chan(&server_result_po); - - let cont_po = oldcomm::Port::<()>(); - let cont_ch = oldcomm::Chan(&cont_po); + let (cont_po, cont_ch) = stream::<()>(); + let cont_ch = SharedChan(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { - let actual_req = do oldcomm::listen |server_ch| { - run_tcp_test_server( - server_ip, - server_port, - expected_resp, - server_ch, - cont_ch, - &hl_loop_clone) - }; - server_result_ch.send(actual_req); + let cont_ch = cont_ch.clone(); + run_tcp_test_server( + server_ip, + server_port, + expected_resp, + cont_ch.clone(), + &hl_loop_clone); }; - oldcomm::recv(cont_po); + cont_po.recv(); // client debug!("server started, firing up client.."); let server_addr = ip::v4::parse_addr(server_ip); @@ -1841,22 +1813,25 @@ pub mod test { } fn run_tcp_test_server(server_ip: &str, server_port: uint, resp: ~str, - server_ch: oldcomm::Chan<~str>, - cont_ch: oldcomm::Chan<()>, + cont_ch: SharedChan<()>, iotask: &IoTask) -> ~str { + let (server_po, server_ch) = stream::<~str>(); + let server_ch = SharedChan(server_ch); let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(move server_ip_addr, server_port, 128, iotask, // on_establish_cb -- called when listener is set up |kill_ch| { - debug!("establish_cb %?", kill_ch); - oldcomm::send(cont_ch, ()); + debug!("establish_cb %?", + kill_ch); + cont_ch.send(()); }, // risky to run this on the loop, but some users // will want the POWER |new_conn, kill_ch| { - debug!("SERVER: new connection!"); - do oldcomm::listen |cont_ch| { + debug!("SERVER: new connection!"); + let (cont_po, cont_ch) = stream(); + let server_ch = server_ch.clone(); do task::spawn_sched(task::ManualThreads(1u)) { debug!("SERVER: starting worker for new req"); @@ -1865,8 +1840,9 @@ pub mod test { if result::is_err(&accept_result) { debug!("SERVER: error accept connection"); let err_data = result::get_err(&accept_result); - oldcomm::send(kill_ch, Some(err_data)); - debug!("SERVER/WORKER: send on err cont ch"); + kill_ch.send(Some(err_data)); + debug!( + "SERVER/WORKER: send on err cont ch"); cont_ch.send(()); } else { @@ -1889,12 +1865,12 @@ pub mod test { debug!("SERVER: before write"); tcp_write_single(&sock, str::to_bytes(resp)); debug!("SERVER: after write.. die"); - oldcomm::send(kill_ch, None); + kill_ch.send(None); } result::Err(move err_data) => { debug!("SERVER: error recvd: %s %s", err_data.err_name, err_data.err_msg); - oldcomm::send(kill_ch, Some(err_data)); + kill_ch.send(Some(err_data)); server_ch.send(~""); } } @@ -1902,9 +1878,7 @@ pub mod test { } } debug!("SERVER: waiting to recv on cont_ch"); - cont_ch.recv() - }; - debug!("SERVER: recv'd on cont_ch..leaving listen cb"); + cont_po.recv(); }); // err check on listen_result if result::is_err(&listen_result) { @@ -1921,7 +1895,7 @@ pub mod test { } } } - let ret_val = server_ch.recv(); + let ret_val = server_po.recv(); debug!("SERVER: exited and got return val: '%s'", ret_val); ret_val } @@ -1949,7 +1923,6 @@ pub mod test { } fn run_tcp_test_client(server_ip: &str, server_port: uint, resp: &str, - client_ch: oldcomm::Chan<~str>, iotask: &IoTask) -> result::Result<~str, TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); @@ -1972,9 +1945,9 @@ pub mod test { Ok(~"") } else { - client_ch.send(str::from_bytes(read_result.get())); - let ret_val = client_ch.recv(); - debug!("CLIENT: after client_ch recv ret: '%s'", ret_val); + let ret_val = str::from_bytes(read_result.get()); + debug!("CLIENT: after client_ch recv ret: '%s'", + ret_val); Ok(ret_val) } } diff --git a/src/libstd/test.rs b/src/libstd/test.rs index 2db1a51e34a..58bc32b71af 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -27,7 +27,7 @@ use core::either; use core::io::WriterUtil; use core::io; use core::libc::size_t; -use core::oldcomm; +use core::pipes::{stream, Chan, Port, SharedChan}; use core::option; use core::prelude::*; use core::result; @@ -305,8 +305,8 @@ fn run_tests(opts: &TestOpts, let mut wait_idx = 0; let mut done_idx = 0; - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); while done_idx < total { while wait_idx < concurrency && run_idx < total { @@ -317,12 +317,12 @@ fn run_tests(opts: &TestOpts, // that hang forever. callback(TeWait(copy test)); } - run_test(move test, ch); + run_test(move test, ch.clone()); wait_idx += 1; run_idx += 1; } - let (test, result) = oldcomm::recv(p); + let (test, result) = p.recv(); if concurrency != 1 { callback(TeWait(copy test)); } @@ -406,9 +406,9 @@ struct TestFuture { wait: fn@() -> TestResult, } -pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) { +pub fn run_test(test: TestDesc, monitor_ch: SharedChan<MonitorMsg>) { if test.ignore { - oldcomm::send(monitor_ch, (copy test, TrIgnored)); + monitor_ch.send((copy test, TrIgnored)); return; } @@ -420,7 +420,7 @@ pub fn run_test(test: TestDesc, monitor_ch: oldcomm::Chan<MonitorMsg>) { }).spawn(move testfn); let task_result = option::unwrap(move result_future).recv(); let test_result = calc_result(&test, task_result == task::Success); - oldcomm::send(monitor_ch, (copy test, test_result)); + monitor_ch.send((copy test, test_result)); }; } @@ -440,7 +440,7 @@ mod tests { use test::{TestOpts, run_test}; use core::either; - use core::oldcomm; + use core::pipes::{stream, SharedChan}; use core::option; use core::vec; @@ -453,10 +453,10 @@ mod tests { ignore: true, should_fail: false }; - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); run_test(desc, ch); - let (_, res) = oldcomm::recv(p); + let (_, res) = p.recv(); assert res != TrOk; } @@ -469,10 +469,10 @@ mod tests { ignore: true, should_fail: false }; - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); run_test(desc, ch); - let (_, res) = oldcomm::recv(p); + let (_, res) = p.recv(); assert res == TrIgnored; } @@ -486,10 +486,10 @@ mod tests { ignore: false, should_fail: true }; - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); run_test(desc, ch); - let (_, res) = oldcomm::recv(p); + let (_, res) = p.recv(); assert res == TrOk; } @@ -502,10 +502,10 @@ mod tests { ignore: false, should_fail: true }; - let p = oldcomm::Port(); - let ch = oldcomm::Chan(&p); + let (p, ch) = stream(); + let ch = SharedChan(ch); run_test(desc, ch); - let (_, res) = oldcomm::recv(p); + let (_, res) = p.recv(); assert res == TrFailed; } diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index d62e2cbf05d..b967f92a22e 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -18,7 +18,9 @@ use uv::iotask::IoTask; use core::either; use core::libc; -use core::oldcomm; +use core::libc::c_void; +use core::cast::transmute; +use core::pipes::{stream, Chan, SharedChan, Port, select2i}; use core::prelude::*; use core::ptr; use core; @@ -41,12 +43,11 @@ use core; */ pub fn delayed_send<T: Owned>(iotask: &IoTask, msecs: uint, - ch: oldcomm::Chan<T>, + ch: &Chan<T>, val: T) { unsafe { - let timer_done_po = oldcomm::Port::<()>(); - let timer_done_ch = oldcomm::Chan(&timer_done_po); - let timer_done_ch_ptr = ptr::addr_of(&timer_done_ch); + let (timer_done_po, timer_done_ch) = stream::<()>(); + let timer_done_ch = SharedChan(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(&timer); do iotask::interact(iotask) |loop_ptr| { @@ -56,9 +57,15 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask, let start_result = uv::ll::timer_start( timer_ptr, delayed_send_cb, msecs, 0u); if (start_result == 0i32) { + // Note: putting the channel into a ~ + // to cast to *c_void + let timer_done_ch_clone = ~timer_done_ch.clone(); + let timer_done_ch_ptr = transmute::< + ~SharedChan<()>, *c_void>( + timer_done_ch_clone); uv::ll::set_data_for_uv_handle( timer_ptr, - timer_done_ch_ptr as *libc::c_void); + timer_done_ch_ptr); } else { let error_msg = uv::ll::get_last_err_info( loop_ptr); @@ -73,11 +80,11 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask, } }; // delayed_send_cb has been processed by libuv - oldcomm::recv(timer_done_po); + timer_done_po.recv(); // notify the caller immediately - oldcomm::send(ch, move(val)); + ch.send(val); // uv_close for this timer has been processed - oldcomm::recv(timer_done_po); + timer_done_po.recv(); }; } @@ -93,10 +100,9 @@ pub fn delayed_send<T: Owned>(iotask: &IoTask, * * msecs - an amount of time, in milliseconds, for the current task to block */ pub fn sleep(iotask: &IoTask, msecs: uint) { - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - delayed_send(iotask, msecs, exit_ch, ()); - oldcomm::recv(exit_po); + let (exit_po, exit_ch) = stream::<()>(); + delayed_send(iotask, msecs, &exit_ch, ()); + exit_po.recv(); } /** @@ -121,20 +127,17 @@ pub fn sleep(iotask: &IoTask, msecs: uint) { */ pub fn recv_timeout<T: Copy Owned>(iotask: &IoTask, msecs: uint, - wait_po: oldcomm::Port<T>) + wait_po: &Port<T>) -> Option<T> { - let timeout_po = oldcomm::Port::<()>(); - let timeout_ch = oldcomm::Chan(&timeout_po); - delayed_send(iotask, msecs, timeout_ch, ()); + let (timeout_po, timeout_ch) = stream::<()>(); + delayed_send(iotask, msecs, &timeout_ch, ()); // FIXME: This could be written clearer (#2618) either::either( - |left_val| { - log(debug, fmt!("recv_time .. left_val %?", - left_val)); + |_| { None - }, |right_val| { - Some(*right_val) - }, &oldcomm::select2(timeout_po, wait_po) + }, |_| { + Some(wait_po.recv()) + }, &select2i(&timeout_po, wait_po) ) } @@ -144,11 +147,14 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, unsafe { log(debug, fmt!("delayed_send_cb handle %? status %?", handle, status)); - let timer_done_ch = - *(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>); + // Faking a borrowed pointer to our ~SharedChan + let timer_done_ch_ptr: &*c_void = &uv::ll::get_data_for_uv_handle( + handle); + let timer_done_ch_ptr = transmute::<&*c_void, &~SharedChan<()>>( + timer_done_ch_ptr); let stop_result = uv::ll::timer_stop(handle); if (stop_result == 0i32) { - oldcomm::send(timer_done_ch, ()); + timer_done_ch_ptr.send(()); uv::ll::close(handle, delayed_send_close_cb); } else { let loop_ptr = uv::ll::get_loop_for_uv_handle(handle); @@ -161,9 +167,10 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, extern fn delayed_send_close_cb(handle: *uv::ll::uv_timer_t) { unsafe { log(debug, fmt!("delayed_send_close_cb handle %?", handle)); - let timer_done_ch = - *(uv::ll::get_data_for_uv_handle(handle) as *oldcomm::Chan<()>); - oldcomm::send(timer_done_ch, ()); + let timer_done_ch_ptr = uv::ll::get_data_for_uv_handle(handle); + let timer_done_ch = transmute::<*c_void, ~SharedChan<()>>( + timer_done_ch_ptr); + timer_done_ch.send(()); } } @@ -175,9 +182,9 @@ mod test { use uv; use core::iter; - use core::oldcomm; use core::rand; use core::task; + use core::pipes::{stream, SharedChan}; #[test] pub fn test_gl_timer_simple_sleep_test() { @@ -195,8 +202,8 @@ mod test { #[test] pub fn test_gl_timer_sleep_stress2() { - let po = oldcomm::Port(); - let ch = oldcomm::Chan(&po); + let (po, ch) = stream(); + let ch = SharedChan(ch); let hl_loop = &uv::global_loop::get(); let repeat = 20u; @@ -210,8 +217,10 @@ mod test { for iter::repeat(repeat) { + let ch = ch.clone(); for spec.each |spec| { let (times, maxms) = *spec; + let ch = ch.clone(); let hl_loop_clone = hl_loop.clone(); do task::spawn { use rand::*; @@ -219,13 +228,13 @@ mod test { for iter::repeat(times) { sleep(&hl_loop_clone, rng.next() as uint % maxms); } - oldcomm::send(ch, ()); + ch.send(()); } } } for iter::repeat(repeat * spec.len()) { - oldcomm::recv(po) + po.recv() } } @@ -246,14 +255,13 @@ mod test { task::yield(); let expected = rand::rng().gen_str(16u); - let test_po = core::comm::port::<str>(); - let test_ch = core::comm::chan(test_po); + let (test_po, test_ch) = stream::<~str>(); do task::spawn() { - delayed_send(hl_loop, 1u, test_ch, expected); + delayed_send(hl_loop, 1u, &test_ch, expected); }; - match recv_timeout(hl_loop, 10u, test_po) { + match recv_timeout(hl_loop, 10u, &test_po) { Some(val) => { assert val == expected; successes += 1; @@ -274,14 +282,13 @@ mod test { for iter::repeat(times as uint) { let expected = rand::Rng().gen_str(16u); - let test_po = oldcomm::Port::<~str>(); - let test_ch = oldcomm::Chan(&test_po); + let (test_po, test_ch) = stream::<~str>(); let hl_loop_clone = hl_loop.clone(); do task::spawn() { - delayed_send(&hl_loop_clone, 50u, test_ch, expected); + delayed_send(&hl_loop_clone, 50u, &test_ch, expected); }; - match recv_timeout(&hl_loop, 1u, test_po) { + match recv_timeout(&hl_loop, 1u, &test_po) { None => successes += 1, _ => failures += 1 }; diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index dc0092aadfa..6fcbccf8183 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -209,16 +209,17 @@ mod test { use core::iter; use core::libc; - use core::oldcomm; use core::ptr; use core::task; + use core::pipes::{stream, Chan, SharedChan, Port}; extern fn async_close_cb(handle: *ll::uv_async_t) { unsafe { log(debug, fmt!("async_close_cb handle %?", handle)); - let exit_ch = (*(ll::get_data_for_uv_handle(handle) + let exit_ch = &(*(ll::get_data_for_uv_handle(handle) as *AhData)).exit_ch; - oldcomm::send(exit_ch, ()); + let exit_ch = exit_ch.clone(); + exit_ch.send(()); } } extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int) { @@ -230,17 +231,16 @@ mod test { } struct AhData { iotask: IoTask, - exit_ch: oldcomm::Chan<()> + exit_ch: SharedChan<()> } fn impl_uv_iotask_async(iotask: &IoTask) { unsafe { let async_handle = ll::async_t(); let ah_ptr = ptr::addr_of(&async_handle); - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); + let (exit_po, exit_ch) = stream::<()>(); let ah_data = AhData { iotask: iotask.clone(), - exit_ch: exit_ch + exit_ch: SharedChan(exit_ch) }; let ah_data_ptr: *AhData = unsafe { ptr::to_unsafe_ptr(&ah_data) @@ -256,13 +256,13 @@ mod test { } }; debug!("waiting for async close"); - oldcomm::recv(exit_po); + exit_po.recv(); } } // this fn documents the bear minimum neccesary to roll your own // high_level_loop - unsafe fn spawn_test_loop(exit_ch: oldcomm::Chan<()>) -> IoTask { + unsafe fn spawn_test_loop(exit_ch: ~Chan<()>) -> IoTask { let (iotask_port, iotask_ch) = stream::<IoTask>(); do task::spawn_sched(task::ManualThreads(1u)) { debug!("about to run a test loop"); @@ -287,9 +287,8 @@ mod test { #[test] fn test_uv_iotask_async() { unsafe { - let exit_po = oldcomm::Port::<()>(); - let exit_ch = oldcomm::Chan(&exit_po); - let iotask = &spawn_test_loop(exit_ch); + let (exit_po, exit_ch) = stream::<()>(); + let iotask = &spawn_test_loop(~exit_ch); debug!("spawned iotask"); @@ -300,24 +299,25 @@ mod test { // race-condition type situations.. this ensures that the // loop lives until, at least, all of the // impl_uv_hl_async() runs have been called, at least. - let work_exit_po = oldcomm::Port::<()>(); - let work_exit_ch = oldcomm::Chan(&work_exit_po); + let (work_exit_po, work_exit_ch) = stream::<()>(); + let work_exit_ch = SharedChan(work_exit_ch); for iter::repeat(7u) { let iotask_clone = iotask.clone(); + let work_exit_ch_clone = work_exit_ch.clone(); do task::spawn_sched(task::ManualThreads(1u)) { debug!("async"); impl_uv_iotask_async(&iotask_clone); debug!("done async"); - oldcomm::send(work_exit_ch, ()); + work_exit_ch_clone.send(()); }; }; for iter::repeat(7u) { debug!("waiting"); - oldcomm::recv(work_exit_po); + work_exit_po.recv(); }; log(debug, ~"sending teardown_loop msg.."); exit(iotask); - oldcomm::recv(exit_po); + exit_po.recv(); log(debug, ~"after recv on exit_po.. exiting.."); } } diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index 499d36a6613..8bef6eb6c91 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -39,6 +39,7 @@ use core::ptr::to_unsafe_ptr; use core::ptr; use core::str; use core::vec; +use core::pipes::{stream, Chan, SharedChan, Port}; // libuv struct mappings pub struct uv_ip4_addr { @@ -1132,7 +1133,6 @@ pub mod test { use uv_ll::*; use core::libc; - use core::oldcomm; use core::ptr; use core::str; use core::sys; @@ -1148,7 +1148,7 @@ pub mod test { struct request_wrapper { write_req: *uv_write_t, req_buf: *~[uv_buf_t], - read_chan: *oldcomm::Chan<~str>, + read_chan: SharedChan<~str>, } extern fn after_close_cb(handle: *libc::c_void) { @@ -1187,9 +1187,9 @@ pub mod test { let buf_base = get_base_from_buf(buf); let buf_len = get_len_from_buf(buf); let bytes = vec::from_buf(buf_base, buf_len as uint); - let read_chan = *((*client_data).read_chan); + let read_chan = (*client_data).read_chan.clone(); let msg_from_server = str::from_bytes(bytes); - oldcomm::send(read_chan, msg_from_server); + read_chan.send(msg_from_server); close(stream as *libc::c_void, after_close_cb) } else if (nread == -1) { @@ -1257,7 +1257,7 @@ pub mod test { } fn impl_uv_tcp_request(ip: &str, port: int, req_str: &str, - client_chan: *oldcomm::Chan<~str>) { + client_chan: SharedChan<~str>) { unsafe { let test_loop = loop_new(); let tcp_handle = tcp_t(); @@ -1283,9 +1283,11 @@ pub mod test { log(debug, fmt!("tcp req: tcp stream: %d write_handle: %d", tcp_handle_ptr as int, write_handle_ptr as int)); - let client_data = { writer_handle: write_handle_ptr, - req_buf: ptr::addr_of(&req_msg), - read_chan: client_chan }; + let client_data = request_wrapper { + write_req: write_handle_ptr, + req_buf: ptr::addr_of(&req_msg), + read_chan: client_chan + }; let tcp_init_result = tcp_init( test_loop as *libc::c_void, tcp_handle_ptr); @@ -1388,8 +1390,8 @@ pub mod test { log(debug, ~"SERVER: client req contains kill_msg!"); log(debug, ~"SERVER: sending response to client"); read_stop(client_stream_ptr); - let server_chan = *((*client_data).server_chan); - oldcomm::send(server_chan, request_str); + let server_chan = (*client_data).server_chan.clone(); + server_chan.send(request_str); let write_result = write( write_req, client_stream_ptr as *libc::c_void, @@ -1484,12 +1486,12 @@ pub mod test { server: *uv_tcp_t, server_kill_msg: ~str, server_resp_buf: *~[uv_buf_t], - server_chan: *oldcomm::Chan<~str>, + server_chan: SharedChan<~str>, server_write_req: *uv_write_t, } struct async_handle_data { - continue_chan: *oldcomm::Chan<bool>, + continue_chan: SharedChan<bool>, } extern fn async_close_cb(handle: *libc::c_void) { @@ -1506,9 +1508,9 @@ pub mod test { // do its thang let data = get_data_for_uv_handle( async_handle as *libc::c_void) as *async_handle_data; - let continue_chan = *((*data).continue_chan); + let continue_chan = (*data).continue_chan.clone(); let should_continue = status == 0i32; - oldcomm::send(continue_chan, should_continue); + continue_chan.send(should_continue); close(async_handle as *libc::c_void, async_close_cb); } } @@ -1517,8 +1519,8 @@ pub mod test { server_port: int, +kill_server_msg: ~str, +server_resp_msg: ~str, - server_chan: *oldcomm::Chan<~str>, - continue_chan: *oldcomm::Chan<bool>) { + server_chan: SharedChan<~str>, + continue_chan: SharedChan<bool>) { unsafe { let test_loop = loop_new(); let tcp_server = tcp_t(); @@ -1626,36 +1628,35 @@ pub mod test { let port = 8886; let kill_server_msg = ~"does a dog have buddha nature?"; let server_resp_msg = ~"mu!"; - let client_port = oldcomm::Port::<~str>(); - let client_chan = oldcomm::Chan::<~str>(&client_port); - let server_port = oldcomm::Port::<~str>(); - let server_chan = oldcomm::Chan::<~str>(&server_port); + let (client_port, client_chan) = stream::<~str>(); + let client_chan = SharedChan(client_chan); + let (server_port, server_chan) = stream::<~str>(); + let server_chan = SharedChan(server_chan); - let continue_port = oldcomm::Port::<bool>(); - let continue_chan = oldcomm::Chan::<bool>(&continue_port); - let continue_chan_ptr = ptr::addr_of(&continue_chan); + let (continue_port, continue_chan) = stream::<bool>(); + let continue_chan = SharedChan(continue_chan); do task::spawn_sched(task::ManualThreads(1)) { impl_uv_tcp_server(bind_ip, port, kill_server_msg, server_resp_msg, - ptr::addr_of(&server_chan), - continue_chan_ptr); + server_chan.clone(), + continue_chan.clone()); }; // block until the server up is.. possibly a race? log(debug, ~"before receiving on server continue_port"); - oldcomm::recv(continue_port); + continue_port.recv(); log(debug, ~"received on continue port, set up tcp client"); do task::spawn_sched(task::ManualThreads(1u)) { impl_uv_tcp_request(request_ip, port, kill_server_msg, - ptr::addr_of(&client_chan)); + client_chan.clone()); }; - let msg_from_client = oldcomm::recv(server_port); - let msg_from_server = oldcomm::recv(client_port); + let msg_from_client = server_port.recv(); + let msg_from_server = client_port.recv(); assert str::contains(msg_from_client, kill_server_msg); assert str::contains(msg_from_server, server_resp_msg); |
