Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 75 additions & 165 deletions proxy/PluginVC.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ PluginVC::main_handler(int event, void *data)
}

if (need_read_process) {
process_read_side(false);
process_read_side();
}

if (need_write_process && !closed) {
process_write_side(false);
process_write_side();
}
}

Expand Down Expand Up @@ -343,11 +343,11 @@ PluginVC::reenable_re(VIO *vio)
if (vio->op == VIO::WRITE) {
ink_assert(vio == &write_state.vio);
need_write_process = true;
process_write_side(false);
process_write_side();
} else if (vio->op == VIO::READ) {
ink_assert(vio == &read_state.vio);
need_read_process = true;
process_read_side(false);
process_read_side();
} else {
ink_release_assert(0);
}
Expand Down Expand Up @@ -458,22 +458,20 @@ PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from,
return total_added;
}

// void PluginVC::process_write_side(bool cb_ok)
// void PluginVC::process_write_side()
//
// This function may only be called while holding
// this->mutex & while it is ok to callback the
// write side continuation
//
// Does write side processing
// Does write side processing directly to other read side writer
//
void
PluginVC::process_write_side(bool other_side_call)
PluginVC::process_write_side()
{
ink_assert(!deletable);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);

MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;

Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
need_write_process = false;

Expand All @@ -494,7 +492,8 @@ PluginVC::process_write_side(bool other_side_call)

Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);

if (other_side->closed || other_side->read_state.shutdown) {
// Check read_state of other side
if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
return;
}
Expand All @@ -507,68 +506,72 @@ PluginVC::process_write_side(bool other_side_call)
}
return;
}
// Bytes available, try to transfer to the PluginVCCore
// intermediate buffer
//
int64_t buf_space = core_obj->buffer_size - core_buffer->max_read_avail();

// Check the state of the other side read buffer as well as ntodo
int64_t other_ntodo = other_side->read_state.vio.ntodo();
if (other_ntodo == 0) {
return;
}
act_on = std::min(act_on, other_ntodo);

// Other side read_state is open
// obtain the proper mutexes on other side
EThread *my_ethread = mutex->thread_holding;
ink_assert(my_ethread != nullptr);
MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
if (!lock.is_locked()) {
Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));

// set need_read_process to enforce the read processing
other_side->need_read_process = true;
other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
return;
}

// Bytes available, setting up other side read state writer
MIOBuffer *output_buffer = other_side->read_state.vio.get_writer();
int64_t water_mark = output_buffer->water_mark;
water_mark = std::max<int64_t>(water_mark, core_obj->buffer_size);
int64_t buf_space = water_mark - output_buffer->max_read_avail();
if (buf_space <= 0) {
Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE);
return;
}
act_on = std::min(act_on, buf_space);

int64_t added = transfer_bytes(core_buffer, reader, act_on);
int64_t added = transfer_bytes(output_buffer, reader, act_on);
if (added < 0) {
// Couldn't actually get the buffer space. This only
// happens on small transfers with the above
// buffer_size factor doesn't apply
Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
Debug("pvc", "[%u] %s: process_read_side from other side out of buffer space", core_obj->id, PVC_TYPE);
return;
}

write_state.vio.ndone += added;
other_side->read_state.vio.ndone += added;

Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);
Debug("pvc", "[%u] %s: process_write_side and process_read_side from other side; added %" PRId64 "", core_obj->id, PVC_TYPE,
added);

if (write_state.vio.ntodo() == 0) {
write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
} else {
write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
}

update_inactive_time();
if (other_side->read_state.vio.ntodo() == 0) {
other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &other_side->read_state.vio);
} else {
other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &other_side->read_state.vio);
}

// Wake up the read side on the other side to process these bytes
if (!other_side->closed) {
if (!other_side_call) {
/* To clear the `need_read_process`, the mutexes must be obtained:
*
* - PluginVC::mutex
* - PluginVC::read_state.vio.mutex
*
*/
if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) {
// Just return, no touch on `other_side->need_read_process`.
return;
}
// Acquire the lock of the read side continuation
EThread *my_ethread = mutex->thread_holding;
ink_assert(my_ethread != nullptr);
MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
if (!lock.is_locked()) {
Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id,
((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));

// set need_read_process to enforce the read processing
other_side->need_read_process = true;
other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
return;
}
update_inactive_time();
other_side->update_inactive_time();

other_side->process_read_side(true);
} else {
other_side->read_state.vio.reenable();
}
if (!closed) {
write_state.vio.reenable();
}
}

Expand All @@ -578,120 +581,45 @@ PluginVC::process_write_side(bool other_side_call)
// this->mutex & while it is ok to callback the
// read side continuation
//
// Does read side processing
// Closes read state if other side
// write state is no longer available
//
void
PluginVC::process_read_side(bool other_side_call)
PluginVC::process_read_side()
{
ink_assert(!deletable);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);

// TODO: Never used??
// MIOBuffer *core_buffer;

IOBufferReader *core_reader;

if (vc_type == PLUGIN_VC_ACTIVE) {
// core_buffer = core_obj->p_to_a_buffer;
core_reader = core_obj->p_to_a_reader;
} else {
ink_assert(vc_type == PLUGIN_VC_PASSIVE);
// core_buffer = core_obj->a_to_p_buffer;
core_reader = core_obj->a_to_p_reader;
}

Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
need_read_process = false;

// Check read_state
if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
return;
}

// Check the state of our read buffer as well as ntodo
int64_t ntodo = read_state.vio.ntodo();
if (ntodo == 0) {
if (read_state.vio.op != VIO::READ || closed || read_state.shutdown || !read_state.vio.ntodo()) {
return;
}

int64_t bytes_avail = core_reader->read_avail();
int64_t act_on = std::min(bytes_avail, ntodo);

Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on);

if (act_on <= 0) {
if (other_side->closed || other_side->write_state.shutdown) {
read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
if (!other_side->closed && !other_side->write_state.shutdown) {
if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) {
// Just return, no touch on `other_side->need_write_process`.
return;
}
return;
}
// Bytes available, try to transfer from the PluginVCCore
// intermediate buffer
//
MIOBuffer *output_buffer = read_state.vio.get_writer();

int64_t water_mark = output_buffer->water_mark;
water_mark = std::max(water_mark, static_cast<int64_t>(core_obj->buffer_size));
int64_t buf_space = water_mark - output_buffer->max_read_avail();
if (buf_space <= 0) {
Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
return;
}
act_on = std::min(act_on, buf_space);

int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
if (added <= 0) {
// Couldn't actually get the buffer space. This only
// happens on small transfers with the above
// buffer_size factor doesn't apply
Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
return;
}

read_state.vio.ndone += added;

Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added);

if (read_state.vio.ntodo() == 0) {
read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
} else {
read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
}

update_inactive_time();

// Wake up the other side so it knows there is space available in
// intermediate buffer
if (!other_side->closed) {
if (!other_side_call) {
/* To clear the `need_write_process`, the mutexes must be obtained:
*
* - PluginVC::mutex
* - PluginVC::write_state.vio.mutex
*
*/
if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) {
// Just return, no touch on `other_side->need_write_process`.
return;
}
// Acquire the lock of the write side continuation
EThread *my_ethread = mutex->thread_holding;
ink_assert(my_ethread != nullptr);
MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
if (!lock.is_locked()) {
Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));

// set need_write_process to enforce the write processing
other_side->need_write_process = true;
other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
return;
}
// Acquire the lock of the write side continuation
EThread *my_ethread = mutex->thread_holding;
ink_assert(my_ethread != nullptr);
MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
if (!lock.is_locked()) {
Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id,
((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive"));

other_side->process_write_side(true);
} else {
other_side->write_state.vio.reenable();
// set need_write_process to enforce the write processing
other_side->need_write_process = true;
other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event);
return;
}
other_side->process_write_side();
} else {
Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE);
read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
}
}

Expand Down Expand Up @@ -1078,14 +1006,6 @@ PluginVCCore::init(int64_t buffer_index, int64_t buffer_water_mark)
passive_vc.mutex = mutex;
passive_vc.thread = active_vc.thread;

p_to_a_buffer = new_MIOBuffer(buffer_index);
p_to_a_buffer->water_mark = buffer_water_mark;
p_to_a_reader = p_to_a_buffer->alloc_reader();

a_to_p_buffer = new_MIOBuffer(buffer_index);
a_to_p_buffer->water_mark = buffer_water_mark;
a_to_p_reader = a_to_p_buffer->alloc_reader();

buffer_size = BUFFER_SIZE_FOR_INDEX(buffer_index);

Debug("pvc",
Expand All @@ -1111,16 +1031,6 @@ PluginVCCore::destroy()
passive_vc.write_state.vio.buffer.clear();
passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;

if (p_to_a_buffer) {
free_MIOBuffer(p_to_a_buffer);
p_to_a_buffer = nullptr;
}

if (a_to_p_buffer) {
free_MIOBuffer(a_to_p_buffer);
a_to_p_buffer = nullptr;
}

this->mutex = nullptr;
delete this;
}
Expand Down
10 changes: 2 additions & 8 deletions proxy/PluginVC.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ class PluginVC : public NetVConnection, public PluginIdentity
int main_handler(int event, void *data);

private:
void process_read_side(bool);
void process_write_side(bool);
void process_read_side();
void process_write_side();
void process_close();
void process_timeout(Event **e, int event_to_send);

Expand Down Expand Up @@ -254,12 +254,6 @@ class PluginVCCore : public Continuation
Continuation *connect_to = nullptr;
bool connected = false;

MIOBuffer *p_to_a_buffer = nullptr;
IOBufferReader *p_to_a_reader = nullptr;

MIOBuffer *a_to_p_buffer = nullptr;
IOBufferReader *a_to_p_reader = nullptr;

IpEndpoint passive_addr_struct;
IpEndpoint active_addr_struct;

Expand Down