Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ COMMONOBJS = array.$(OBJEXT) \
load.$(OBJEXT) \
proc.$(OBJEXT) \
file.$(OBJEXT) \
gc_threading.$(OBJEXT) \
gc.$(OBJEXT) \
hash.$(OBJEXT) \
inits.$(OBJEXT) \
Expand Down Expand Up @@ -627,6 +628,7 @@ load.$(OBJEXT): {$(VPATH)}load.c {$(VPATH)}eval_intern.h \
file.$(OBJEXT): {$(VPATH)}file.c $(RUBY_H_INCLUDES) {$(VPATH)}io.h \
$(ENCODING_H_INCLUDES) {$(VPATH)}util.h {$(VPATH)}dln.h \
{$(VPATH)}internal.h
gc_threading.$(OBJEXT): {$(VPATH)}gc_threading.c $(RUBY_H_INCLUDES)
gc.$(OBJEXT): {$(VPATH)}gc.c $(RUBY_H_INCLUDES) {$(VPATH)}re.h \
{$(VPATH)}regex.h $(ENCODING_H_INCLUDES) $(VM_CORE_H_INCLUDES) \
{$(VPATH)}gc.h {$(VPATH)}io.h {$(VPATH)}eval_intern.h {$(VPATH)}util.h \
Expand Down
288 changes: 14 additions & 274 deletions gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include <setjmp.h>
#include <sys/types.h>

#include <pthread.h>

#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
Expand Down Expand Up @@ -422,258 +420,6 @@ rb_objspace_alloc(void)

static void initial_expand_heap(rb_objspace_t *objspace);


/* CS194: Marking stack struct and methods */

#define GC_STACK_EMPTY -1
typedef struct deque_struct {
VALUE* buffer;
int max_length; //Should be the same size as buffer
int length;
int head;
int tail;
} deque_t;


static void deque_init(deque_t* deque, int max_length);
static void deque_destroy(deque_t* deque);
/* Push val onto the front of deque. Returns 1 if successful, 0 if the stack is
already full.
*/
static int deque_push(deque_t* deque, VALUE val);
static VALUE deque_pop(deque_t* deque);
static VALUE deque_pop_back(deque_t* deque);
static int deque_empty_p(deque_t* deque);
static int deque_full_p(deque_t* deque);

static void deque_init(deque_t* deque, int max_length) {
//TODO: check error and handle this reasonably
VALUE* buffer = (VALUE*) malloc(sizeof(VALUE)*max_length);

deque->buffer = buffer;
deque->max_length = max_length;
deque->length = 0;
deque->head = deque->tail = -1;
}

static void deque_destroy(deque_t* deque) {
free(deque->buffer);
}

static void deque_destroy_callback(void* deque) {
deque_destroy((deque_t*) deque);
}

static int deque_push(deque_t* deque, VALUE val) {
if (deque_full_p(deque))
return 0;

if (deque_empty_p(deque))
deque->head = 0;

deque->tail = (deque->tail + 1) % deque->max_length;
deque->buffer[deque->tail] = val;
deque->length++;
return 1;
}

static int deque_empty_p(deque_t* deque) {
return deque->length == 0;
}

static int deque_full_p(deque_t* deque) {
return deque->length == deque->max_length;
}


static VALUE deque_pop(deque_t* deque) {
VALUE rtn;
if (deque_empty_p(deque))
return GC_STACK_EMPTY;

rtn = deque->buffer[deque->tail];
if (deque->length - 1 == 0) {
//Reset head and tail to beginning
deque->head = deque->tail = -1;
}
else {
deque->tail = (deque->tail - 1) % deque->max_length;
}
deque->length--;
return rtn;
}

static VALUE deque_pop_back(deque_t* deque) {
VALUE rtn;

if (deque_empty_p(deque))
return GC_STACK_EMPTY;

rtn = deque->buffer[deque->head];
if (deque->length - 1 == 0) {
//Reset head and tail to beginning if this call empties the deque
deque->head = deque->tail = -1;
}
else {
deque->head = (deque->head - 1) % deque->max_length;
}
deque->length--;
return rtn;
}


/* MARK QUEUE */

typedef struct mark_queue_node_struct mark_queue_node_t;

struct mark_queue_node_struct {
rb_objspace_t* objspace;
VALUE ptr;
int lev;
mark_queue_node_t* next;
};

typedef struct mark_queue_struct {
mark_queue_node_t* head;
mark_queue_node_t* tail;
unsigned int size;
} mark_queue_t;


mark_queue_t mark_queue = {NULL, NULL, 0};

#define NTHREADS 4


#define GLOBAL_QUEUE_SIZE 100 /*TODO*/
#define GLOBAL_QUEUE_SIZE_MIN (GLOBAL_QUEUE_SIZE / 4)

#define LOCAL_QUEUE_SIZE 100 /*TODO*/
#define MAX_WORK_TO_GRAB 4
#define MAX_WORK_TO_OFFER 4
typedef struct global_queue_struct {
unsigned int waiters;
unsigned int count;
deque_t deque;
pthread_mutex_t lock;
pthread_cond_t wait_condition;
unsigned int complete;
} global_queue_t;

void global_queue_init(global_queue_t* global_queue) {
global_queue->waiters = 0;
global_queue->count = 0;
deque_init(&(global_queue->deque), GLOBAL_QUEUE_SIZE);
pthread_mutex_init(&global_queue->lock, NULL);
pthread_cond_init(&global_queue->wait_condition, NULL);
}

void global_queue_destroy(global_queue_t* global_queue) {
deque_destroy(&(global_queue->deque));
pthread_mutex_destroy(&global_queue->lock);
pthread_cond_destroy(&global_queue->wait_condition);
}

void global_queue_pop_work(global_queue_t* global_queue, deque_t* local_queue) {
int i;

pthread_mutex_lock(&global_queue->lock);
while (global_queue->count == 0 && !global_queue->complete) {
global_queue->waiters++;
if (global_queue->waiters == NTHREADS) {
global_queue->complete = 1;
pthread_cond_broadcast(&global_queue->wait_condition);
} else {
// Release the lock and go to sleep until someone signals
pthread_cond_wait(&global_queue->wait_condition, &global_queue->lock);
}
global_queue->waiters--;
}

for (i = 0; i < MAX_WORK_TO_GRAB; i++) {
if (deque_empty_p(&global_queue->deque))
break;
deque_push(local_queue, deque_pop(&(global_queue->deque)));
}

pthread_mutex_unlock(&global_queue->lock);
}

void global_queue_offer_work(global_queue_t* global_queue, deque_t* local_queue) {
int i;
int localqueuesize = local_queue->length;
if ((global_queue->waiters && localqueuesize > 2) ||
(global_queue->count < GLOBAL_QUEUE_SIZE_MIN &&
localqueuesize > LOCAL_QUEUE_SIZE / 2)) {
if (pthread_mutex_trylock(&global_queue->lock)) {
//Offer to global
for (i = 0; i < localqueuesize / 2; i++) {
deque_push(&(global_queue->deque), deque_pop_back(local_queue));
}
if (global_queue->waiters) {
pthread_cond_broadcast(&global_queue->wait_condition);
}
pthread_mutex_unlock(&global_queue->lock);
}
}
}

static void gc_mark(rb_objspace_t *objspace, VALUE ptr, int lev);
static void gc_marks(rb_objspace_t *objspace);

rb_objspace_t* active_objspace;
global_queue_t* global_queue;
pthread_key_t thread_local_deque_k;
void* mark_run_loop(void* arg) {
long thread_id = (long) arg;
deque_t deque;
deque_init(&deque, LOCAL_QUEUE_SIZE);
pthread_setspecific(thread_local_deque_k, &deque);
if (thread_id == 0) {
gc_marks(active_objspace);
}
while (!global_queue->complete) {
global_queue_offer_work(global_queue, &deque);
if (deque_empty_p(&deque)) {
global_queue_pop_work(global_queue, &deque);
}
gc_mark(active_objspace, deque_pop(&deque), 1);
}
}

void gc_mark_parallel(rb_objspace_t* objspace) {
global_queue_t queuedata;
pthread_attr_t attr;
pthread_t threads[NTHREADS];
long t;
void* status;

active_objspace = objspace;
global_queue = &queuedata;
global_queue_init(global_queue);

pthread_key_create(&thread_local_deque_k, deque_destroy_callback);

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

for (t = 0; t < NTHREADS; t++) {
pthread_create(&threads[t], &attr, mark_run_loop, (void*)t);
//TODO: handle error codes
}
pthread_attr_destroy(&attr);

for (t = 0; t < NTHREADS; t++) {
pthread_join(threads[t], &status);
//TODO: handle error codes
}
global_queue_destroy(global_queue);
}

void gc_mark_defer(rb_objspace_t *objspace, VALUE ptr, int lev);
int gc_mark_pop();


void
rb_gc_set_params(void)
{
Expand Down Expand Up @@ -1599,6 +1345,7 @@ init_mark_stack(rb_objspace_t *objspace)

#define MARK_STACK_EMPTY (mark_stack_ptr == mark_stack)

extern void gc_mark_defer(rb_objspace_t *objspace, VALUE ptr, int lev);
static void gc_mark_children(rb_objspace_t *objspace, VALUE ptr, int lev);

static void
Expand Down Expand Up @@ -1875,7 +1622,6 @@ rb_gc_mark_maybe(VALUE obj)
}
}

/* CS194 TODO: */
static void
gc_mark(rb_objspace_t *objspace, VALUE ptr, int lev)
{
Expand Down Expand Up @@ -1903,29 +1649,12 @@ gc_mark(rb_objspace_t *objspace, VALUE ptr, int lev)
gc_mark_children(objspace, ptr, lev+1);
}

void
gc_mark_defer(rb_objspace_t *objspace, VALUE ptr, int lev) {
deque_t* deque = (deque_t*) pthread_getspecific(thread_local_deque_k);
if (deque_push(deque, ptr) == 0) {
global_queue_offer_work(global_queue, deque);
if (deque_push(deque, ptr) == 0) {
gc_mark(objspace, ptr, lev);
}
}
}

void
rb_gc_mark(VALUE ptr)
{
gc_mark(&rb_objspace, ptr, 0);
}


/* Marks all children of ptr. The children are found by the giant switch statement.
* CS194 TODO: switch this to something that enqueues children, instead of calling
* mark directly on them.
*
*/
static void
gc_mark_children(rb_objspace_t *objspace, VALUE ptr, int lev)
{
Expand Down Expand Up @@ -2427,6 +2156,18 @@ rest_sweep(rb_objspace_t *objspace)
}
}

extern void gc_mark_parallel(void* objspace);

static void gc_marks(rb_objspace_t *objspace);

void gc_start_mark(void* objspace) {
gc_marks((rb_objspace_t*) objspace);
}

void gc_do_mark(void* objspace, VALUE ptr) {
gc_mark((rb_objspace_t*) objspace, ptr, 1);
}

static int
gc_lazy_sweep(rb_objspace_t *objspace)
{
Expand Down Expand Up @@ -2459,7 +2200,7 @@ gc_lazy_sweep(rb_objspace_t *objspace)
return TRUE;
}
}
// gc_marks(objspace);

gc_mark_parallel(objspace);

before_gc_sweep(objspace);
Expand Down Expand Up @@ -2768,7 +2509,6 @@ garbage_collect(rb_objspace_t *objspace)

during_gc++;
gc_mark_parallel(objspace);
//gc_marks(objspace);

GC_PROF_SWEEP_TIMER_START;
gc_sweep(objspace);
Expand Down
Loading