diff options
| author | Rob Arnold <robarnold@cs.cmu.edu> | 2011-06-15 22:04:31 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2011-08-05 11:57:06 -0700 |
| commit | f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa (patch) | |
| tree | eda7bb0b2f57582791c7c60800a8c2d98d866ec3 /src/rt/rust_uv.cpp | |
| parent | b64a52df42feeb4fa59d11b19568e2aba5ef9e65 (diff) | |
| download | rust-f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa.tar.gz rust-f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa.zip | |
Basic async IO module using libuv
Diffstat (limited to 'src/rt/rust_uv.cpp')
| -rw-r--r-- | src/rt/rust_uv.cpp | 300 |
1 files changed, 300 insertions, 0 deletions
diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp new file mode 100644 index 00000000000..9e4e15b61aa --- /dev/null +++ b/src/rt/rust_uv.cpp @@ -0,0 +1,300 @@ +#include "rust_internal.h" +#include "rust_upcall.h" +// Disable libev prototypes - they will make inline compatability functions +// which are unused and so trigger a warning in gcc since -Wall is on. +#define EV_PROTOTYPES 0 +#include "libuv/uv.h" + +#ifdef __GNUC__ +#define LOG_CALLBACK_ENTRY(p) \ + LOG(iotask, callback, "> IO CALLBACK %s %p", __FUNCTION__, p) +#else +#define LOG_CALLBACK_ENTRY(p) \ + LOG(iotask, callback, "> IO CALLBACK %s:%d %p", __FILE__, __LINE__, p) +#endif + +// The primary task which is running the event loop. This is used to dispatch +// all the notifications back to rust so we clone all passed in channels to +// this task. +static rust_task *iotask = NULL; + +struct socket_data : public task_owned<socket_data> { + // Either the task that the connection attempt was made from or the task + // that the server was spawned on. + rust_task *task; + // Channel for reporting the status of a connection attempt + // For connections from servers, this is always null + // For server sockets, this is used to send the notification that the server + // was closed. + rust_chan *chan; + // Channel to a port which receives bytes from this socket + rust_chan *reader; + uv_tcp_t socket; + + ~socket_data() { + if (chan) + chan->deref(); + if (reader) + reader->deref(); + } +}; + +struct request : public uv_req_t, public task_owned<request> { + rust_task *task; + // Used for notifying about completion of connections, writes + rust_chan *chan; + request(socket_data *data, rust_chan *chan, + void (*cb)(request *req, int status)) { + uv_req_init(this, (uv_handle_t*)&data->socket, (void*)cb); + this->data = data; + this->task = data->task; + this->chan = chan->clone(iotask); + } + socket_data *socket() { + return (socket_data*)data; + } + void send_result(void *data) { + chan->send(&data); + chan->deref(); + chan = NULL; + } +}; + +extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *); + +static uv_idle_s idle_handler; + +static void idle_callback(uv_handle_t* handle, int status) { + rust_task *task = reinterpret_cast<rust_task*>(handle->data); + task->yield(); +} + +extern "C" CDECL void aio_init(rust_task *task) { + LOG_UPCALL_ENTRY(task); + iotask = task; + uv_init(); + uv_idle_init(&idle_handler); + uv_idle_start(&idle_handler, idle_callback); +} + +extern "C" CDECL void aio_run(rust_task *task) { + LOG_UPCALL_ENTRY(task); + idle_handler.data = task; + uv_run(); +} + +void nop_close(uv_handle_t* handle) {} + +extern "C" CDECL void aio_stop(rust_task *task) { + LOG_UPCALL_ENTRY(task); + uv_close((uv_handle_t*)&idle_handler, nop_close); +} + +static socket_data *make_socket(rust_task *task, rust_chan *chan) { + socket_data *data = new (task, "make_socket") socket_data; + if (!data || + uv_tcp_init(&data->socket)) { + return NULL; + } + data->task = task; + // Connections from servers don't have a channel + if (chan) { + data->chan = chan->clone(iotask); + } else { + data->chan = NULL; + } + data->socket.data = data; + data->reader = NULL; + return data; +} + +// We allocate the requested space + rust_vec but return a pointer at a +// +rust_vec offset so that it writes the bytes to the correct location. +static uv_buf_t alloc_buffer(uv_tcp_t *socket, size_t suggested_size) { + LOG_CALLBACK_ENTRY(socket); + uv_buf_t buf; + size_t actual_size = suggested_size + sizeof (rust_ivec_heap); + socket_data *data = (socket_data*)socket->data; + char *base = + reinterpret_cast<char*>(data->task->kernel->malloc(actual_size, + "read buffer")); + buf.base = base + sizeof (rust_ivec_heap); + buf.len = suggested_size; + return buf; +} + +static void read_progress(uv_tcp_t *socket, ssize_t nread, uv_buf_t buf) { + LOG_CALLBACK_ENTRY(socket); + socket_data *data = (socket_data*)socket->data; + I(data->task->sched, data->reader != NULL); + I(data->task->sched, nread <= ssize_t(buf.len)); + + rust_ivec_heap *base = reinterpret_cast<rust_ivec_heap*>( + reinterpret_cast<char*>(buf.base) - sizeof (rust_ivec_heap)); + rust_ivec v; + v.fill = 0; + v.alloc = buf.len; + v.payload.ptr = base; + + switch (nread) { + case -1: // End of stream + base->fill = 0; + uv_read_stop(socket); + break; + case 0: // Nothing read + data->task->kernel->free(base); + return; + default: // Got nread bytes + base->fill = nread; + break; + } + data->reader->send(&v); +} + +static void new_connection(uv_tcp_t *socket, int status) { + LOG_CALLBACK_ENTRY(socket); + socket_data *server = (socket_data*)socket->data; + I(server->task->sched, socket == &server->socket); + // Connections from servers don't have a channel + socket_data *client = make_socket(server->task, NULL); + if (!client) { + server->task->fail(); + return; + } + if (uv_accept(socket, &client->socket)) { + aio_close_socket(client->task, client); + server->task->fail(); + return; + } + server->chan->send(&client); +} + +extern "C" CDECL socket_data *aio_serve(rust_task *task, const char *ip, + int port, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(ip), port); + socket_data *server = make_socket(iotask, chan); + if (!server) + goto oom; + if (uv_bind(&server->socket, addr) || + uv_listen(&server->socket, 128, new_connection)) { + aio_close_socket(task, server); + return NULL; + } + return server; +oom: + task->fail(); + return NULL; +} + +static void free_socket(uv_handle_t *handle) { + LOG_CALLBACK_ENTRY(socket); + uv_tcp_t *socket = (uv_tcp_t*)handle; + socket_data *data = (socket_data*)socket->data; + I(data->task->sched, socket == &data->socket); + // For client sockets, send a 0-size buffer to indicate that we're done + // reading and should send the close notification. + if (data->reader) { + if (data->reader->is_associated()) { + uv_buf_t buf = alloc_buffer(socket, 0); + read_progress(socket, -1, buf); + uv_read_stop(socket); + } + } else { + // This is a server socket + bool closed = true; + I(data->task->sched, data->chan != NULL); + data->task->kill(); + data->chan->send(&closed); + } + delete data; +} + +extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *client) { + LOG_UPCALL_ENTRY(task); + if (uv_close((uv_handle_t*)&client->socket, free_socket)) { + task->fail(); + } +} + +extern "C" CDECL void aio_close_server(rust_task *task, socket_data *server, + rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + // XXX: hax until rust_task::kill + // send null and the receiver knows to call back into native code to check + void* null_client = NULL; + server->chan->send(&null_client); + server->chan->deref(); + server->chan = chan->clone(iotask); + aio_close_socket(task, server); +} + +extern "C" CDECL bool aio_is_null_client(rust_task *task, + socket_data *server) { + LOG_UPCALL_ENTRY(task); + return server == NULL; +} + +static void connection_complete(request *req, int status) { + LOG_CALLBACK_ENTRY(socket); + socket_data *client = req->socket(); + req->send_result(client); + delete req; +} + +extern "C" CDECL void aio_connect(rust_task *task, const char *host, + int port, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(host), port); + request *req; + socket_data *client = make_socket(iotask, NULL); + if (!client) { + goto oom_client; + } + req = new (client->task, "connection request") + request(client, chan, connection_complete); + if (!req) { + goto oom_req; + } + if (0 == uv_connect(req, addr)) { + return; + } +oom_req: + aio_close_socket(task, client); +oom_client: + task->fail(); + return; +} + +static void write_complete(request *req, int status) { + LOG_CALLBACK_ENTRY(socket); + bool success = status == 0; + req->send_result(&success); + delete req; +} + +extern "C" CDECL void aio_writedata(rust_task *task, socket_data *data, + char *buf, size_t size, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + uv_buf_t buffer = { buf, size }; + request *req = new (data->task, "write request") + request(data, chan, write_complete); + if (!req) { + goto fail; + } + if (uv_write(req, &buffer, 1)) { + delete req; + goto fail; + } + return; +fail: + task->fail(); +} + +extern "C" CDECL void aio_read(rust_task *task, socket_data *data, + rust_chan *reader) { + LOG_UPCALL_ENTRY(task); + I(task->sched, data->reader == NULL); + data->reader = reader->clone(iotask); + uv_read_start(&data->socket, alloc_buffer, read_progress); +} |
