about summary refs log tree commit diff
path: root/src/rt/rust_uv.cpp
diff options
context:
space:
mode:
authorRob Arnold <robarnold@cs.cmu.edu>2011-06-15 22:04:31 -0700
committerBrian Anderson <banderson@mozilla.com>2011-08-05 11:57:06 -0700
commitf4b87c749fc5dc085cd31ba3b5f91f11d863e0fa (patch)
treeeda7bb0b2f57582791c7c60800a8c2d98d866ec3 /src/rt/rust_uv.cpp
parentb64a52df42feeb4fa59d11b19568e2aba5ef9e65 (diff)
downloadrust-f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa.tar.gz
rust-f4b87c749fc5dc085cd31ba3b5f91f11d863e0fa.zip
Basic async IO module using libuv
Diffstat (limited to 'src/rt/rust_uv.cpp')
-rw-r--r--src/rt/rust_uv.cpp300
1 files changed, 300 insertions, 0 deletions
diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp
new file mode 100644
index 00000000000..9e4e15b61aa
--- /dev/null
+++ b/src/rt/rust_uv.cpp
@@ -0,0 +1,300 @@
+#include "rust_internal.h"
+#include "rust_upcall.h"
+// Disable libev prototypes - they will make inline compatability functions
+// which are unused and so trigger a warning in gcc since -Wall is on.
+#define EV_PROTOTYPES 0
+#include "libuv/uv.h"
+
+#ifdef __GNUC__
+#define LOG_CALLBACK_ENTRY(p) \
+    LOG(iotask, callback, "> IO CALLBACK %s %p", __FUNCTION__, p)
+#else
+#define LOG_CALLBACK_ENTRY(p) \
+    LOG(iotask, callback, "> IO CALLBACK %s:%d %p", __FILE__, __LINE__, p)
+#endif
+
+// The primary task which is running the event loop. This is used to dispatch
+// all the notifications back to rust so we clone all passed in channels to
+// this task.
+static rust_task *iotask = NULL;
+
+struct socket_data : public task_owned<socket_data> {
+  // Either the task that the connection attempt was made from or the task
+  // that the server was spawned on.
+  rust_task *task;
+  // Channel for reporting the status of a connection attempt
+  // For connections from servers, this is always null
+  // For server sockets, this is used to send the notification that the server
+  // was closed.
+  rust_chan *chan;
+  // Channel to a port which receives bytes from this socket
+  rust_chan *reader;
+  uv_tcp_t socket;
+
+  ~socket_data() {
+    if (chan)
+      chan->deref();
+    if (reader)
+      reader->deref();
+  }
+};
+
+struct request : public uv_req_t, public task_owned<request> {
+  rust_task *task;
+  // Used for notifying about completion of connections, writes
+  rust_chan *chan;
+  request(socket_data *data, rust_chan *chan,
+          void (*cb)(request *req, int status)) {
+    uv_req_init(this, (uv_handle_t*)&data->socket, (void*)cb);
+    this->data = data;
+    this->task = data->task;
+    this->chan = chan->clone(iotask);
+  }
+  socket_data *socket() {
+    return (socket_data*)data;
+  }
+  void send_result(void *data) {
+    chan->send(&data);
+    chan->deref();
+    chan = NULL;
+  }
+};
+
+extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *);
+
+static uv_idle_s idle_handler;
+
+static void idle_callback(uv_handle_t* handle, int status) {
+  rust_task *task = reinterpret_cast<rust_task*>(handle->data);
+  task->yield();
+}
+
+extern "C" CDECL void aio_init(rust_task *task) {
+  LOG_UPCALL_ENTRY(task);
+  iotask = task;
+  uv_init();
+  uv_idle_init(&idle_handler);
+  uv_idle_start(&idle_handler, idle_callback);
+}
+
+extern "C" CDECL void aio_run(rust_task *task) {
+  LOG_UPCALL_ENTRY(task);
+  idle_handler.data = task;
+  uv_run();
+}
+
+void nop_close(uv_handle_t* handle) {}
+
+extern "C" CDECL void aio_stop(rust_task *task) {
+  LOG_UPCALL_ENTRY(task);
+  uv_close((uv_handle_t*)&idle_handler, nop_close);
+}
+
+static socket_data *make_socket(rust_task *task, rust_chan *chan) {
+  socket_data *data = new (task, "make_socket") socket_data;
+  if (!data ||
+      uv_tcp_init(&data->socket)) {
+    return NULL;
+  }
+  data->task = task;
+  // Connections from servers don't have a channel
+  if (chan) {
+    data->chan = chan->clone(iotask);
+  } else {
+    data->chan = NULL;
+  }
+  data->socket.data = data;
+  data->reader = NULL;
+  return data;
+}
+
+// We allocate the requested space + rust_vec but return a pointer at a
+// +rust_vec offset so that it writes the bytes to the correct location.
+static uv_buf_t alloc_buffer(uv_tcp_t *socket, size_t suggested_size) {
+  LOG_CALLBACK_ENTRY(socket);
+  uv_buf_t buf;
+  size_t actual_size = suggested_size + sizeof (rust_ivec_heap);
+  socket_data *data = (socket_data*)socket->data;
+  char *base =
+    reinterpret_cast<char*>(data->task->kernel->malloc(actual_size,
+                                                       "read buffer"));
+  buf.base = base + sizeof (rust_ivec_heap);
+  buf.len = suggested_size;
+  return buf;
+}
+
+static void read_progress(uv_tcp_t *socket, ssize_t nread, uv_buf_t buf) {
+  LOG_CALLBACK_ENTRY(socket);
+  socket_data *data = (socket_data*)socket->data;
+  I(data->task->sched, data->reader != NULL);
+  I(data->task->sched, nread <= ssize_t(buf.len));
+
+  rust_ivec_heap *base = reinterpret_cast<rust_ivec_heap*>(
+      reinterpret_cast<char*>(buf.base) - sizeof (rust_ivec_heap));
+  rust_ivec v;
+  v.fill = 0;
+  v.alloc = buf.len;
+  v.payload.ptr = base;
+
+  switch (nread) {
+    case -1: // End of stream
+      base->fill = 0;
+      uv_read_stop(socket);
+      break;
+    case 0: // Nothing read
+      data->task->kernel->free(base);
+      return;
+    default: // Got nread bytes
+      base->fill = nread;
+      break;
+  }
+  data->reader->send(&v);
+}
+
+static void new_connection(uv_tcp_t *socket, int status) {
+  LOG_CALLBACK_ENTRY(socket);
+  socket_data *server = (socket_data*)socket->data;
+  I(server->task->sched, socket == &server->socket);
+  // Connections from servers don't have a channel
+  socket_data *client = make_socket(server->task, NULL);
+  if (!client) {
+    server->task->fail();
+    return;
+  }
+  if (uv_accept(socket, &client->socket)) {
+    aio_close_socket(client->task, client);
+    server->task->fail();
+    return;
+  }
+  server->chan->send(&client);
+}
+
+extern "C" CDECL socket_data *aio_serve(rust_task *task, const char *ip,
+                                        int port, rust_chan *chan) {
+  LOG_UPCALL_ENTRY(task);
+  struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(ip), port);
+  socket_data *server = make_socket(iotask, chan);
+  if (!server)
+    goto oom;
+  if (uv_bind(&server->socket, addr) ||
+      uv_listen(&server->socket, 128, new_connection)) {
+    aio_close_socket(task, server);
+    return NULL;
+  }
+  return server;
+oom:
+  task->fail();
+  return NULL;
+}
+
+static void free_socket(uv_handle_t *handle) {
+  LOG_CALLBACK_ENTRY(socket);
+  uv_tcp_t *socket = (uv_tcp_t*)handle;
+  socket_data *data = (socket_data*)socket->data;
+  I(data->task->sched, socket == &data->socket);
+  // For client sockets, send a 0-size buffer to indicate that we're done
+  // reading and should send the close notification.
+  if (data->reader) {
+    if (data->reader->is_associated()) {
+      uv_buf_t buf = alloc_buffer(socket, 0);
+      read_progress(socket, -1, buf);
+      uv_read_stop(socket);
+    }
+  } else {
+    // This is a server socket
+    bool closed = true;
+    I(data->task->sched, data->chan != NULL);
+    data->task->kill();
+    data->chan->send(&closed);
+  }
+  delete data;
+}
+
+extern "C" CDECL void aio_close_socket(rust_task *task, socket_data *client) {
+  LOG_UPCALL_ENTRY(task);
+  if (uv_close((uv_handle_t*)&client->socket, free_socket)) {
+    task->fail();
+  }
+}
+
+extern "C" CDECL void aio_close_server(rust_task *task, socket_data *server,
+                                       rust_chan *chan) {
+  LOG_UPCALL_ENTRY(task);
+  // XXX: hax until rust_task::kill
+  // send null and the receiver knows to call back into native code to check
+  void* null_client = NULL;
+  server->chan->send(&null_client);
+  server->chan->deref();
+  server->chan = chan->clone(iotask);
+  aio_close_socket(task, server);
+}
+
+extern "C" CDECL bool aio_is_null_client(rust_task *task,
+                                         socket_data *server) {
+  LOG_UPCALL_ENTRY(task);
+  return server == NULL;
+}
+
+static void connection_complete(request *req, int status) {
+  LOG_CALLBACK_ENTRY(socket);
+  socket_data *client = req->socket();
+  req->send_result(client);
+  delete req;
+}
+
+extern "C" CDECL void aio_connect(rust_task *task, const char *host,
+                                  int port, rust_chan *chan) {
+  LOG_UPCALL_ENTRY(task);
+  struct sockaddr_in addr = uv_ip4_addr(const_cast<char*>(host), port);
+  request *req;
+  socket_data *client = make_socket(iotask, NULL);
+  if (!client) {
+    goto oom_client;
+  }
+  req = new (client->task, "connection request")
+    request(client, chan, connection_complete);
+  if (!req) {
+    goto oom_req;
+  }
+  if (0 == uv_connect(req, addr)) {
+    return;
+  }
+oom_req:
+  aio_close_socket(task, client);
+oom_client:
+  task->fail();
+  return;
+}
+
+static void write_complete(request *req, int status) {
+  LOG_CALLBACK_ENTRY(socket);
+  bool success = status == 0;
+  req->send_result(&success);
+  delete req;
+}
+
+extern "C" CDECL void aio_writedata(rust_task *task, socket_data *data,
+                                    char *buf, size_t size, rust_chan *chan) {
+  LOG_UPCALL_ENTRY(task);
+  uv_buf_t buffer = { buf, size };
+  request *req = new (data->task, "write request")
+    request(data, chan, write_complete);
+  if (!req) {
+    goto fail;
+  }
+  if (uv_write(req, &buffer, 1)) {
+    delete req;
+    goto fail;
+  }
+  return;
+fail:
+  task->fail();
+}
+
+extern "C" CDECL void aio_read(rust_task *task, socket_data *data,
+                               rust_chan *reader) {
+  LOG_UPCALL_ENTRY(task);
+  I(task->sched, data->reader == NULL);
+  data->reader = reader->clone(iotask);
+  uv_read_start(&data->socket, alloc_buffer, read_progress);
+}