about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorEric Holk <eholk@mozilla.com>2011-07-07 11:53:08 -0700
committerEric Holk <eholk@mozilla.com>2011-07-07 18:22:27 -0700
commit8acadb17c2d679291aa94e94af8cc96513cab830 (patch)
tree4bf732a0c5c6857382fb63310aad9961190fdf52 /src
parentdcd2563a3a7662d03ab33b67c92652e6e24c5af1 (diff)
downloadrust-8acadb17c2d679291aa94e94af8cc96513cab830.tar.gz
rust-8acadb17c2d679291aa94e94af8cc96513cab830.zip
Work on debugging race conditions.
Ports and channels have been moved to the kernel pool, since they've
been known to outlive their associated task. This probably isn't the
right thing to do, the life cycle needs fixed instead.

Some refactorying in memory_region.cpp. Added a helper function to
increment and decrement the allocation counter. This makes it easier
to switch between atomic and non-atomic increments. Using atomic
increments for now, although this still does not fix the problem.
Diffstat (limited to 'src')
-rw-r--r--src/rt/memory_region.cpp23
-rw-r--r--src/rt/memory_region.h3
-rw-r--r--src/rt/rust_chan.cpp14
-rw-r--r--src/rt/rust_chan.h3
-rw-r--r--src/rt/rust_port.cpp8
-rw-r--r--src/rt/rust_port.h5
-rw-r--r--src/rt/rust_task.cpp3
-rw-r--r--src/rt/rust_task.h2
-rw-r--r--src/rt/rust_upcall.cpp43
-rw-r--r--src/rt/sync/sync.h21
10 files changed, 90 insertions, 35 deletions
diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp
index 6c50bf42d8c..809ac81f716 100644
--- a/src/rt/memory_region.cpp
+++ b/src/rt/memory_region.cpp
@@ -19,8 +19,18 @@ memory_region::memory_region(memory_region *parent) :
     // Nop.
 }
 
+void memory_region::add_alloc() {
+    //_live_allocations++;
+    sync::increment(_live_allocations);
+}
+
+void memory_region::dec_alloc() {
+    //_live_allocations--;
+    sync::decrement(_live_allocations);
+}
+
 void memory_region::free(void *mem) {
-    // printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem);
+    // printf("free: ptr 0x%" PRIxPTR" region=%p\n", (uintptr_t) mem, this);
     if (!mem) { return; }
     if (_synchronized) { _lock.lock(); }
 #ifdef TRACK_ALLOCATIONS
@@ -33,7 +43,7 @@ void memory_region::free(void *mem) {
     if (_live_allocations < 1) {
         _srv->fatal("live_allocs < 1", __FILE__, __LINE__, "");
     }
-    _live_allocations--;
+    dec_alloc();
     _srv->free(mem);
     if (_synchronized) { _lock.unlock(); }
 }
@@ -42,7 +52,7 @@ void *
 memory_region::realloc(void *mem, size_t size) {
     if (_synchronized) { _lock.lock(); }
     if (!mem) {
-        _live_allocations++;
+        add_alloc();
     }
     void *newMem = _srv->realloc(mem, size);
 #ifdef TRACK_ALLOCATIONS
@@ -59,12 +69,13 @@ memory_region::realloc(void *mem, size_t size) {
 void *
 memory_region::malloc(size_t size) {
     if (_synchronized) { _lock.lock(); }
-    _live_allocations++;
+    add_alloc();
     void *mem = _srv->malloc(size);
 #ifdef TRACK_ALLOCATIONS
     _allocation_list.append(mem);
 #endif
-    // printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem);
+    // printf("malloc: ptr 0x%" PRIxPTR " region=%p\n", 
+    //        (uintptr_t) mem, this);
     if (_synchronized) { _lock.unlock(); }
     return mem;
 }
@@ -72,7 +83,7 @@ memory_region::malloc(size_t size) {
 void *
 memory_region::calloc(size_t size) {
     if (_synchronized) { _lock.lock(); }
-    _live_allocations++;
+    add_alloc();
     void *mem = _srv->malloc(size);
     memset(mem, 0, size);
 #ifdef TRACK_ALLOCATIONS
diff --git a/src/rt/memory_region.h b/src/rt/memory_region.h
index a72faf76338..36b2e1a4164 100644
--- a/src/rt/memory_region.h
+++ b/src/rt/memory_region.h
@@ -22,6 +22,9 @@ private:
     const bool _detailed_leaks;
     const bool _synchronized;
     lock_and_signal _lock;
+
+    void add_alloc();
+    void dec_alloc();
 public:
     memory_region(rust_srv *srv, bool synchronized);
     memory_region(memory_region *parent);
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 78301e3d879..141d60a3e9c 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -6,11 +6,12 @@
  */
 rust_chan::rust_chan(rust_task *task,
                      maybe_proxy<rust_port> *port,
-                     size_t unit_sz) :
-                     ref_count(1),
-                     task(task),
-                     port(port),
-                     buffer(task, unit_sz) {
+                     size_t unit_sz) 
+    : ref_count(1),
+      kernel(task->kernel),
+      task(task),
+      port(port),
+      buffer(task, unit_sz) {
     ++task->ref_count;
     if (port) {
         associate(port);
@@ -87,6 +88,7 @@ void rust_chan::send(void *sptr) {
         buffer.dequeue(NULL);
     } else {
         rust_port *target_port = port->referent();
+        scoped_lock right(target_port->lock);
         if (target_port->task->blocked_on(target_port)) {
             DLOG(sched, comm, "dequeued in rendezvous_ptr");
             buffer.dequeue(target_port->task->rendezvous_ptr);
@@ -114,7 +116,7 @@ rust_chan *rust_chan::clone(maybe_proxy<rust_task> *target) {
         port = proxy;
         target_task = target->as_proxy()->handle()->referent();
     }
-    return new (target_task) rust_chan(target_task, port, unit_sz);
+    return new (target_task->kernel) rust_chan(target_task, port, unit_sz);
 }
 
 /**
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 4172ca5f09d..752667b8a3c 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -1,7 +1,7 @@
 #ifndef RUST_CHAN_H
 #define RUST_CHAN_H
 
-class rust_chan : public task_owned<rust_chan>,
+class rust_chan : public kernel_owned<rust_chan>,
                   public rust_cond {
 public:
     RUST_REFCOUNTED_WITH_DTOR(rust_chan, destroy())
@@ -9,6 +9,7 @@ public:
 
     ~rust_chan();
 
+    rust_kernel *kernel;
     rust_task *task;
     maybe_proxy<rust_port> *port;
     size_t idx;
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index a2bd3b34c38..0aebda859f8 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -1,16 +1,16 @@
 #include "rust_internal.h"
 #include "rust_port.h"
 
-rust_port::rust_port(rust_task *task, size_t unit_sz) :
-                     maybe_proxy<rust_port>(this), task(task),
-                     unit_sz(unit_sz), writers(task), chans(task) {
+rust_port::rust_port(rust_task *task, size_t unit_sz) 
+    : maybe_proxy<rust_port>(this), kernel(task->kernel), task(task),
+      unit_sz(unit_sz), writers(task), chans(task) {
 
     LOG(task, comm,
         "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
         PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
 
     // Allocate a remote channel, for remote channel data.
-    remote_channel = new (task) rust_chan(task, this, unit_sz);
+    remote_channel = new (task->kernel) rust_chan(task, this, unit_sz);
 }
 
 rust_port::~rust_port() {
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 7a58f839c44..301422cc765 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -2,9 +2,10 @@
 #define RUST_PORT_H
 
 class rust_port : public maybe_proxy<rust_port>,
-                  public task_owned<rust_port> {
+                  public kernel_owned<rust_port> {
 
 public:
+    rust_kernel *kernel;
     rust_task *task;
     size_t unit_sz;
     ptr_vec<rust_token> writers;
@@ -13,6 +14,8 @@ public:
     // Data sent to this port from remote tasks is buffered in this channel.
     rust_chan *remote_channel;
 
+    lock_and_signal lock;
+
     rust_port(rust_task *task, size_t unit_sz);
     ~rust_port();
     void log_state();
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index 721f5b0ed20..6c7ab554002 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -401,6 +401,7 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) {
 
 void
 rust_task::block(rust_cond *on, const char* name) {
+    scoped_lock with(lock);
     LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
                          (uintptr_t) on, (uintptr_t) cond);
     A(sched, cond == NULL, "Cannot block an already blocked task.");
@@ -413,6 +414,7 @@ rust_task::block(rust_cond *on, const char* name) {
 
 void
 rust_task::wakeup(rust_cond *from) {
+    scoped_lock with(lock);
     A(sched, cond != NULL, "Cannot wake up unblocked task.");
     LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR,
                         (uintptr_t) cond, (uintptr_t) from);
@@ -430,6 +432,7 @@ rust_task::wakeup(rust_cond *from) {
 
 void
 rust_task::die() {
+    scoped_lock with(lock);
     transition(&sched->running_tasks, &sched->dead_tasks);
 }
 
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index e15f105222b..c20eae7ece8 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -89,6 +89,8 @@ rust_task : public maybe_proxy<rust_task>,
 
     wakeup_callback *_on_wakeup;
 
+    lock_and_signal lock;
+
     // Only a pointer to 'name' is kept, so it must live as long as this task.
     rust_task(rust_scheduler *sched,
               rust_task_list *state,
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index d89b278790b..383e856f69a 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -92,7 +92,9 @@ upcall_new_port(rust_task *task, size_t unit_sz) {
     LOG_UPCALL_ENTRY(task);
     LOG(task, comm, "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
         (uintptr_t) task, task->name, unit_sz);
-    return new (task) rust_port(task, unit_sz);
+    // take a reference on behalf of the port
+    task->ref();
+    return new (task->kernel) rust_port(task, unit_sz);
 }
 
 extern "C" CDECL void
@@ -101,6 +103,9 @@ upcall_del_port(rust_task *task, rust_port *port) {
     LOG(task, comm, "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
     I(task->sched, !port->ref_count);
     delete port;
+
+    // FIXME: We shouldn't ever directly manipulate the ref count.
+    --task->ref_count;
 }
 
 /**
@@ -114,7 +119,7 @@ upcall_new_chan(rust_task *task, rust_port *port) {
         "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
         (uintptr_t) task, task->name, port);
     I(sched, port);
-    return new (task) rust_chan(task, port, port->unit_sz);
+    return new (task->kernel) rust_chan(task, port, port->unit_sz);
 }
 
 /**
@@ -138,6 +143,8 @@ extern "C" CDECL
 void upcall_del_chan(rust_task *task, rust_chan *chan) {
     LOG_UPCALL_ENTRY(task);
 
+    I(task->sched, chan->task == task);
+
     LOG(task, comm, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
     chan->destroy();
 }
@@ -183,25 +190,27 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
 
 extern "C" CDECL void
 upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
-    LOG_UPCALL_ENTRY(task);
+    {
+        scoped_lock with(port->lock);
+        LOG_UPCALL_ENTRY(task);
     
-    LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
-        ", size: 0x%" PRIxPTR ", chan_no: %d",
-        (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
-        port->chans.length());
+        LOG(task, comm, "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
+            ", size: 0x%" PRIxPTR ", chan_no: %d",
+            (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
+            port->chans.length());
     
-    if (port->receive(dptr)) {
-        return;
-    }
+        if (port->receive(dptr)) {
+            return;
+        }
     
-    // No data was buffered on any incoming channel, so block this task on the
-    // port. Remember the rendezvous location so that any sender task can
-    // write to it before waking up this task.
+        // No data was buffered on any incoming channel, so block this task on
+        // the port. Remember the rendezvous location so that any sender task
+        // can write to it before waking up this task.
     
-    LOG(task, comm, "<=== waiting for rendezvous data ===");
-    task->rendezvous_ptr = dptr;
-    task->block(port, "waiting for rendezvous data");
-
+        LOG(task, comm, "<=== waiting for rendezvous data ===");
+        task->rendezvous_ptr = dptr;
+        task->block(port, "waiting for rendezvous data");
+    }
     task->yield(3);
 }
 
diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h
index eb220e462c6..360fff1fabd 100644
--- a/src/rt/sync/sync.h
+++ b/src/rt/sync/sync.h
@@ -1,3 +1,4 @@
+// -*- c++-mode -*-
 #ifndef SYNC_H
 #define SYNC_H
 
@@ -10,6 +11,26 @@ public:
         T oldValue, T newValue) {
         return __sync_bool_compare_and_swap(address, oldValue, newValue);
     }
+    
+    template <class T>
+    static T increment(T *address) {
+        return __sync_add_and_fetch(address, 1);
+    }
+
+    template <class T>
+    static T decrement(T *address) {
+        return __sync_sub_and_fetch(address, 1);
+    }    
+
+    template <class T>
+    static T increment(T &address) {
+        return __sync_add_and_fetch(&address, 1);
+    }
+
+    template <class T>
+    static T decrement(T &address) {
+        return __sync_sub_and_fetch(&address, 1);
+    }    
 };
 
 /**