Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 25 additions & 44 deletions hiredis-client/ext/redis_client/hiredis/hiredis_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ static const rb_data_type_t hiredis_ssl_context_data_type = {
.dmark = NULL,
.dfree = hiredis_ssl_context_free,
.dsize = hiredis_ssl_context_memsize,
#ifdef HAS_GC_COMPACT
.dcompact = NULL
#endif
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};
Expand Down Expand Up @@ -135,10 +132,18 @@ static VALUE hiredis_ssl_context_init(VALUE self, VALUE ca_file, VALUE ca_path,
return Qnil;
}

typedef struct {
VALUE stack;
int *task_index;
} hiredis_reader_state_t;

static void *reply_append(const redisReadTask *task, VALUE value) {
if (task && task->parent) {
VALUE parent = (VALUE)task->parent->obj;
hiredis_reader_state_t *state = (hiredis_reader_state_t *)task->privdata;
int task_index = *state->task_index;

if (task->parent) {
RUBY_ASSERT(task_index > 0);
VALUE parent = rb_ary_entry(state->stack, task_index - 1);
switch (task->parent->type) {
case REDIS_REPLY_ARRAY:
case REDIS_REPLY_SET:
Expand All @@ -147,18 +152,18 @@ static void *reply_append(const redisReadTask *task, VALUE value) {
break;
case REDIS_REPLY_MAP:
if (task->idx % 2) {
VALUE key = (VALUE)task->parent->privdata;
task->parent->privdata = NULL;
VALUE key = rb_ary_pop(state->stack);
rb_hash_aset(parent, key, value);
} else {
task->parent->privdata = (void*)value;
rb_ary_push(state->stack, value);
}
break;
default:
rb_bug("[hiredis] Unexpected task parent type %d", task->parent->type);
break;
}
}
rb_ary_store(state->stack, task_index, value);
return (void*)value;
}

Expand Down Expand Up @@ -314,37 +319,6 @@ typedef struct {
struct timeval write_timeout;
} hiredis_connection_t;


#ifdef HAS_GC_COMPACT
static void hiredis_connection_compact(void *ptr) {
hiredis_connection_t *connection = ptr;
if (connection->context) {
redisReader *reader = connection->context->reader;
// reader->ridx == -1 when there is no active task
for (int index = 0; index <= reader->ridx; index++) {
redisReadTask *task = reader->task[index];
if (task->obj) { task->obj = (void *)rb_gc_location((VALUE)task->obj); }
if (task->privdata) { task->privdata = (void *)rb_gc_location((VALUE)task->privdata); }
}
}
}
#else
#define rb_gc_mark_movable rb_gc_mark
#endif

static void hiredis_connection_mark(void *ptr) {
hiredis_connection_t *connection = ptr;
if (connection->context) {
redisReader *reader = connection->context->reader;
// reader->ridx == -1 when there is no active task
for (int index = 0; index <= reader->ridx; index++) {
redisReadTask *task = reader->task[index];
if (task->obj) { rb_gc_mark_movable((VALUE)task->obj); }
if (task->privdata) { rb_gc_mark_movable((VALUE)task->privdata); }
}
}
}

static void hiredis_connection_free(void *ptr) {
hiredis_connection_t *connection = ptr;
if (connection) {
Expand Down Expand Up @@ -379,14 +353,11 @@ static size_t hiredis_connection_memsize(const void *ptr) {
static const rb_data_type_t hiredis_connection_data_type = {
.wrap_struct_name = "redis-client:hiredis_connection",
.function = {
.dmark = hiredis_connection_mark,
.dmark = NULL,
.dfree = hiredis_connection_free,
.dsize = hiredis_connection_memsize,
#ifdef HAS_GC_COMPACT
.dcompact = hiredis_connection_compact
#endif
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};

static VALUE hiredis_alloc(VALUE klass) {
Expand Down Expand Up @@ -717,6 +688,16 @@ static int hiredis_read_internal(hiredis_connection_t *connection, VALUE *reply)
void *redis_reply = NULL;
int wdone = 0;

// This struct being on the stack, the GC won't move nor collect that `stack` RArray.
// We use that to avoid having to have a `mark` function with write barriers.
// Not that it would be too hard, but if we mark the response objects, we'll likely end up
// promoting them to the old generation which isn't desirable.
hiredis_reader_state_t reader_state = {
.stack = rb_ary_new(),
.task_index = &connection->context->reader->ridx,
};
connection->context->reader->privdata = &reader_state;

/* Try to read pending replies */
if (redisGetReplyFromReader(connection->context, &redis_reply) == REDIS_ERR) {
return HIREDIS_FATAL_CONNECTION_ERROR; // Protocol error
Expand Down