diff options
| author | Eric Holk <eholk@mozilla.com> | 2011-08-15 16:54:02 -0700 |
|---|---|---|
| committer | Eric Holk <eholk@mozilla.com> | 2011-08-16 09:36:29 -0700 |
| commit | cf2def46c120d8d6ef8a98571a39bef478c8c2a9 (patch) | |
| tree | 902078db51847e2c3badb941dcbceeb5216d866f /src/lib | |
| parent | e33af7e0b505de6d7c754d2ead26c9ee2bc8974e (diff) | |
| download | rust-cf2def46c120d8d6ef8a98571a39bef478c8c2a9.tar.gz rust-cf2def46c120d8d6ef8a98571a39bef478c8c2a9.zip | |
Removed trans_comm.rs from the compiler. Updating aio/sio to work with the new chan and port system, started on a networking module for the standard library.
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/aio.rs | 154 | ||||
| -rw-r--r-- | src/lib/net.rs | 30 | ||||
| -rw-r--r-- | src/lib/sio.rs | 83 | ||||
| -rw-r--r-- | src/lib/std.rc | 1 | ||||
| -rw-r--r-- | src/lib/task.rs | 11 | ||||
| -rw-r--r-- | src/lib/uint.rs | 4 |
6 files changed, 160 insertions, 123 deletions
diff --git a/src/lib/aio.rs b/src/lib/aio.rs index afc3bcadccd..cfeb73929bb 100644 --- a/src/lib/aio.rs +++ b/src/lib/aio.rs @@ -1,5 +1,13 @@ -import str::sbuf; import task; +import ivec; + +import comm; +import comm::_chan; +import comm::_port; +import comm::mk_port; +import comm::send; + +import net; native "rust" mod rustrt { type socket; @@ -7,19 +15,21 @@ native "rust" mod rustrt { fn aio_init(); fn aio_run(); fn aio_stop(); - fn aio_connect(host: sbuf, port: int, connected: chan[socket]); - fn aio_serve(host: sbuf, port: int, acceptChan: chan[socket]) -> server; - fn aio_writedata(s: socket, buf: *u8, size: uint, status: chan[bool]); - fn aio_read(s: socket, reader: chan[[u8]]); - fn aio_close_server(s: server, status: chan[bool]); + fn aio_connect(host: *u8, port: int, connected: &_chan[socket]); + fn aio_serve(host: *u8, port: int, acceptChan: &_chan[socket]) -> server; + fn aio_writedata(s: socket, buf: *u8, size: uint, status: &_chan[bool]); + fn aio_read(s: socket, reader: &_chan[[u8]]); + fn aio_close_server(s: server, status: &_chan[bool]); fn aio_close_socket(s: socket); fn aio_is_null_client(s: socket) -> bool; } +// FIXME: these should be unsafe pointers or something, but those aren't +// currently in the sendable kind, so we'll unsafely cast between ints. type server = rustrt::server; type client = rustrt::socket; tag pending_connection { - remote(str,int); + remote(net::ip_addr,int); incoming(server); } @@ -30,41 +40,43 @@ tag socket_event { } tag server_event { - pending(chan[chan[socket_event]]); + pending(_chan[_chan[socket_event]]); } tag request { quit; - connect(pending_connection,chan[socket_event]); - serve(str,int,chan[server_event],chan[server]); - write(client,[u8],chan[bool]); - close_server(server, chan[bool]); + connect(pending_connection,_chan[socket_event]); + serve(net::ip_addr,int,_chan[server_event],_chan[server]); + write(client,[u8],_chan[bool]); + close_server(server, _chan[bool]); close_client(client); } -type ctx = chan[request]; +type ctx = _chan[request]; -fn connect_task(ip: str, portnum: int, evt: chan[socket_event]) { - let connecter: port[client] = port(); - rustrt::aio_connect(str::buf(ip), portnum, chan(connecter)); - let client: client; - connecter |> client; +fn ip_to_sbuf(ip: net::ip_addr) -> *u8 { + ivec::to_ptr(str::bytes(net::format_addr(ip))) +} + +fn connect_task(ip: net::ip_addr, portnum: int, evt: _chan[socket_event]) { + let connecter: _port[client] = mk_port(); + rustrt::aio_connect(ip_to_sbuf(ip), portnum, connecter.mk_chan()); + let client = connecter.recv(); new_client(client, evt); } -fn new_client(client: client, evt: chan[socket_event]) { +fn new_client(client: client, evt: _chan[socket_event]) { // Start the read before notifying about the connect. This avoids a race // condition where the receiver can close the socket before we start // reading. - let reader: port[[u8]] = port(); - rustrt::aio_read(client, chan(reader)); + let reader: _port[[u8]] = mk_port(); + rustrt::aio_read(client, reader.mk_chan()); - evt <| connected(client); + send(evt, connected(client)); while (true) { log "waiting for bytes"; - let data: [u8]; - reader |> data; + let data: [u8] = reader.recv(); log "got some bytes"; log ivec::len[u8](data); if (ivec::len[u8](data) == 0u) { @@ -72,33 +84,33 @@ fn new_client(client: client, evt: chan[socket_event]) { break; } log "got non-empty buffer, sending"; - evt <| received(data); + send(evt, received(data)); log "sent non-empty buffer"; } log "done reading"; - evt <| closed; + send(evt, closed); log "close message sent"; } -fn accept_task(client: client, events: chan[server_event]) { +fn accept_task(client: client, events: _chan[server_event]) { log "accept task was spawned"; - let p: port[chan[socket_event]] = port(); - events <| pending(chan(p)); - let evt: chan[socket_event]; - p |> evt; + let p: _port[_chan[socket_event]] = mk_port(); + send(events, pending(p.mk_chan())); + let evt = p.recv(); new_client(client, evt); log "done accepting"; } -fn server_task(ip: str, portnum: int, events: chan[server_event], - server: chan[server]) { - let accepter: port[client] = port(); - server <| rustrt::aio_serve(str::buf(ip), portnum, chan(accepter)); +fn server_task(ip: net::ip_addr, portnum: int, events: _chan[server_event], + server: _chan[server]) { + let accepter: _port[client] = mk_port(); + send(server, rustrt::aio_serve(ip_to_sbuf(ip), portnum, + accepter.mk_chan())); let client: client; while (true) { log "preparing to accept a client"; - accepter |> client; + client = accepter.recv(); if (rustrt::aio_is_null_client(client)) { log "client was actually null, returning"; ret; @@ -108,48 +120,48 @@ fn server_task(ip: str, portnum: int, events: chan[server_event], } } -fn request_task(c: chan[ctx]) { +fn request_task(c: _chan[ctx]) { // Create a port to accept IO requests on - let p: port[request] = port(); + let p: _port[request] = mk_port(); // Hand of its channel to our spawner - c <| chan(p); + send(c, p.mk_chan()); log "uv run task spawned"; // Spin for requests let req: request; while (true) { - p |> req; + req = p.recv(); + log_err req; alt req { - quit. { - log "got quit message"; - - log "stopping libuv"; - rustrt::aio_stop(); - ret; - } - connect(remote(ip,portnum),client) { - task::_spawn(bind connect_task(ip, portnum, client)); - } - serve(ip,portnum,events,server) { - task::_spawn(bind server_task(ip, portnum, events, server)); - } - write(socket,v,status) { - rustrt::aio_writedata(socket, - ivec::to_ptr[u8](v), ivec::len[u8](v), - status); - } - close_server(server,status) { - log "closing server"; - rustrt::aio_close_server(server,status); - } - close_client(client) { - log "closing client"; - rustrt::aio_close_socket(client); - } + quit. { + log "got quit message"; + log "stopping libuv"; + rustrt::aio_stop(); + ret; + } + connect(remote(ip,portnum),client) { + task::_spawn(bind connect_task(ip, portnum, client)); + } + serve(ip,portnum,events,server) { + task::_spawn(bind server_task(ip, portnum, events, server)); + } + write(socket,v,status) { + rustrt::aio_writedata(socket, + ivec::to_ptr[u8](v), ivec::len[u8](v), + status); + } + close_server(server,status) { + log "closing server"; + rustrt::aio_close_server(server,status); + } + close_client(client) { + log "closing client"; + rustrt::aio_close_socket(client); + } } } } -fn iotask(c: chan[ctx]) { +fn iotask(c: _chan[ctx]) { log "io task spawned"; // Initialize before accepting requests rustrt::aio_init(); @@ -167,11 +179,9 @@ fn iotask(c: chan[ctx]) { } fn new() -> ctx { - let p: port[ctx] = port(); - let t = task::_spawn(bind iotask(chan(p))); - let cx: ctx; - p |> cx; - ret cx; + let p: _port[ctx] = mk_port(); + task::_spawn(bind iotask(p.mk_chan())); + ret p.recv(); } // Local Variables: diff --git a/src/lib/net.rs b/src/lib/net.rs new file mode 100644 index 00000000000..6a3a6c409c4 --- /dev/null +++ b/src/lib/net.rs @@ -0,0 +1,30 @@ +import str; +import ivec; +import uint; + +tag ip_addr { + ipv4(u8, u8, u8, u8); +} + +fn format_addr(ip : ip_addr) -> str { + alt(ip) { + ipv4(a, b, c, d) { + #fmt("%u.%u.%u.%u", + a as uint, + b as uint, + c as uint, + d as uint) + } + _ { fail "Unsupported address type"; } + } +} + +fn parse_addr(ip : str) -> ip_addr { + let parts = ivec::map(uint::from_str, str::split(ip, ".".(0))); + if ivec::len(parts) != 4u { fail "Too many dots in IP address"; } + for i in parts { if i > 255u { fail "Invalid IP Address part."; } } + ipv4(parts.(0) as u8, + parts.(1) as u8, + parts.(2) as u8, + parts.(3) as u8) +} diff --git a/src/lib/sio.rs b/src/lib/sio.rs index fbeef737a10..7f8e56516e3 100644 --- a/src/lib/sio.rs +++ b/src/lib/sio.rs @@ -1,18 +1,27 @@ +import comm::_port; +import comm::_chan; +import comm::mk_port; +import comm::send; + +import str; +import net; + type ctx = aio::ctx; -type client = { ctx: ctx, client: aio::client, evt: port[aio::socket_event] }; -type server = { ctx: ctx, server: aio::server, evt: port[aio::server_event] }; +type client = { ctx: ctx, client: aio::client, + evt: _port[aio::socket_event] }; +type server = { ctx: ctx, server: aio::server, + evt: _port[aio::server_event] }; fn new() -> ctx { ret aio::new(); } fn destroy(ctx: ctx) { - ctx <| aio::quit; + send(ctx, aio::quit); } -fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client { - let evt: aio::socket_event; - p |> evt; +fn make_socket(ctx: ctx, p: _port[aio::socket_event]) -> client { + let evt: aio::socket_event = p.recv(); alt evt { aio::connected(client) { ret { ctx: ctx, client: client, evt: p }; @@ -21,16 +30,14 @@ fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client { } } -fn connect_to(ctx: ctx, ip: str, portnum: int) -> client { - let p: port[aio::socket_event] = port(); - ctx <| aio::connect(aio::remote(ip, portnum), chan(p)); +fn connect_to(ctx: ctx, ip: net::ip_addr, portnum: int) -> client { + let p: _port[aio::socket_event] = mk_port(); + send(ctx, aio::connect(aio::remote(ip, portnum), p.mk_chan())); ret make_socket(ctx, p); } fn read(c: client) -> [u8] { - let evt: aio::socket_event; - c.evt |> evt; - alt evt { + alt c.evt.recv() { aio::closed. { ret ~[]; } @@ -40,55 +47,51 @@ fn read(c: client) -> [u8] { } } -fn create_server(ctx: ctx, ip: str, portnum: int) -> server { - let evt: port[aio::server_event] = port(); - let p: port[aio::server] = port(); - ctx <| aio::serve(ip, portnum, chan(evt), chan(p)); - let srv: aio::server; - p |> srv; +fn create_server(ctx: ctx, ip: net::ip_addr, portnum: int) -> server { + let evt: _port[aio::server_event] = mk_port(); + let p: _port[aio::server] = mk_port(); + send(ctx, aio::serve(ip, portnum, + evt.mk_chan(), p.mk_chan())); + let srv: aio::server = p.recv(); ret { ctx: ctx, server: srv, evt: evt }; } fn accept_from(server: server) -> client { - let evt: aio::server_event; - server.evt |> evt; + let evt: aio::server_event = server.evt.recv(); alt evt { - aio::pending(callback) { - let p: port[aio::socket_event] = port(); - callback <| chan(p); - ret make_socket(server.ctx, p); - } + aio::pending(callback) { + let p: _port[aio::socket_event] = mk_port(); + send(callback, p.mk_chan()); + ret make_socket(server.ctx, p); + } } } fn write_data(c: client, data: [u8]) -> bool { - let p: port[bool] = port(); - c.ctx <| aio::write(c.client, data, chan(p)); - let success: bool; - p |> success; - ret success; + let p: _port[bool] = mk_port(); + send(c.ctx, aio::write(c.client, data, p.mk_chan())); + ret p.recv(); } fn close_server(server: server) { // TODO: make this unit once we learn to send those from native code - let p: port[bool] = port(); - server.ctx <| aio::close_server(server.server, chan(p)); - let success: bool; + let p: _port[bool] = mk_port(); + send(server.ctx, aio::close_server(server.server, p.mk_chan())); log "Waiting for close"; - p |> success; + p.recv(); log "Got close"; } fn close_client(client: client) { - client.ctx <| aio::close_client(client.client); + send(client.ctx, aio::close_client(client.client)); let evt: aio::socket_event; do { - client.evt |> evt; + evt = client.evt.recv(); alt evt { - aio::closed. { - ret; - } - _ {} + aio::closed. { + ret; + } + _ {} } } while (true); } diff --git a/src/lib/std.rc b/src/lib/std.rc index f23d0b74711..715de750da4 100644 --- a/src/lib/std.rc +++ b/src/lib/std.rc @@ -70,6 +70,7 @@ mod run = "run_program.rs"; mod fs; mod aio; mod sio; +mod net; // FIXME: parametric mod map; diff --git a/src/lib/task.rs b/src/lib/task.rs index 9a4f1cf52e1..935eda0e60b 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -8,7 +8,6 @@ native "rust" mod rustrt { fn pin_task(); fn unpin_task(); fn get_task_id() -> task_id; - fn clone_chan(c: *rust_chan) -> *rust_chan; type rust_chan; type rust_task; @@ -61,16 +60,6 @@ fn pin() { rustrt::pin_task(); } fn unpin() { rustrt::unpin_task(); } -// FIXME: remove this -fn clone_chan[T](c: chan[T]) -> chan[T] { - let cloned = rustrt::clone_chan(unsafe::reinterpret_cast(c)); - ret unsafe::reinterpret_cast(cloned); -} - -fn send[T](c: chan[T], v: &T) { c <| v; } - -fn recv[T](p: port[T]) -> T { let v; p |> v; v } - fn set_min_stack(stack_size : uint) { rustrt::set_min_stack(stack_size); } diff --git a/src/lib/uint.rs b/src/lib/uint.rs index 84eceb3b6d6..acbaee48ac1 100644 --- a/src/lib/uint.rs +++ b/src/lib/uint.rs @@ -56,6 +56,10 @@ fn parse_buf(buf: &[u8], radix: uint) -> uint { fail; } +fn from_str(s : &str) -> uint { + parse_buf(str::bytes(s), 10u) +} + fn to_str(num: uint, radix: uint) -> str { let n = num; assert (0u < radix && radix <= 16u); |
