From 4b607d14641bd98e2194c6addb6b4617ddb0bfee Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Wed, 23 Feb 2022 15:22:12 -0800 Subject: [PATCH 1/4] Remove intermediate buffer in PluginVC --- proxy/PluginVC.cc | 233 +++++++++++++--------------------------------- proxy/PluginVC.h | 10 +- 2 files changed, 69 insertions(+), 174 deletions(-) diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index e64d59f686b..081d8be763c 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -217,11 +217,11 @@ PluginVC::main_handler(int event, void *data) } if (need_read_process) { - process_read_side(false); + process_read_side(); } if (need_write_process && !closed) { - process_write_side(false); + process_write_side(); } } @@ -343,11 +343,11 @@ PluginVC::reenable_re(VIO *vio) if (vio->op == VIO::WRITE) { ink_assert(vio == &write_state.vio); need_write_process = true; - process_write_side(false); + process_write_side(); } else if (vio->op == VIO::READ) { ink_assert(vio == &read_state.vio); need_read_process = true; - process_read_side(false); + process_read_side(); } else { ink_release_assert(0); } @@ -458,22 +458,20 @@ PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from, return total_added; } -// void PluginVC::process_write_side(bool cb_ok) +// void PluginVC::process_write_side() // // This function may only be called while holding // this->mutex & while it is ok to callback the // write side continuation // -// Does write side processing +// Does write side processing directly to other read side writer // void -PluginVC::process_write_side(bool other_side_call) +PluginVC::process_write_side() { ink_assert(!deletable); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); - MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer; - Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE); need_write_process = false; @@ -494,7 +492,8 @@ PluginVC::process_write_side(bool other_side_call) Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on); - if (other_side->closed || other_side->read_state.shutdown) { + // Check read_state of other side + if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) { write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio); return; } @@ -507,28 +506,47 @@ PluginVC::process_write_side(bool other_side_call) } return; } - // Bytes available, try to transfer to the PluginVCCore - // intermediate buffer - // - int64_t buf_space = core_obj->buffer_size - core_buffer->max_read_avail(); + + // Other side read_state is open + // obtain the proper mutexes on other side + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); + + // set need_read_process to enforce the read processing + other_side->need_read_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; + } + + // Bytes available, setting up other side read state writer + MIOBuffer *output_buffer = other_side->read_state.vio.get_writer(); + int64_t water_mark = output_buffer->water_mark; + water_mark = std::max(water_mark, static_cast(core_obj->buffer_size)); + int64_t buf_space = water_mark - output_buffer->max_read_avail(); if (buf_space <= 0) { - Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE); + Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE); return; } act_on = std::min(act_on, buf_space); - int64_t added = transfer_bytes(core_buffer, reader, act_on); + int64_t added = transfer_bytes(output_buffer, reader, act_on); if (added < 0) { // Couldn't actually get the buffer space. This only // happens on small transfers with the above // buffer_size factor doesn't apply - Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE); + Debug("pvc", "[%u] %s: process_read_side from other side out of buffer space", core_obj->id, PVC_TYPE); return; } write_state.vio.ndone += added; + other_side->read_state.vio.ndone += added; - Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added); + Debug("pvc", "[%u] %s: process_write_side and process_read_side from other side; added %" PRId64 "", core_obj->id, PVC_TYPE, + added); if (write_state.vio.ntodo() == 0) { write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio); @@ -536,40 +554,15 @@ PluginVC::process_write_side(bool other_side_call) write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio); } - update_inactive_time(); + if (other_side->read_state.vio.ntodo() == 0) { + other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &other_side->read_state.vio); + } else { + other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &other_side->read_state.vio); + } - // Wake up the read side on the other side to process these bytes - if (!other_side->closed) { - if (!other_side_call) { - /* To clear the `need_read_process`, the mutexes must be obtained: - * - * - PluginVC::mutex - * - PluginVC::read_state.vio.mutex - * - */ - if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) { - // Just return, no touch on `other_side->need_read_process`. - return; - } - // Acquire the lock of the read side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id, - ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - - // set need_read_process to enforce the read processing - other_side->need_read_process = true; - other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); - return; - } + update_inactive_time(); - other_side->process_read_side(true); - } else { - other_side->read_state.vio.reenable(); - } - } + write_state.vio.reenable(); } // void PluginVC::process_read_side() @@ -578,120 +571,46 @@ PluginVC::process_write_side(bool other_side_call) // this->mutex & while it is ok to callback the // read side continuation // -// Does read side processing +// Closes read state if other side +// write state is no longer available // void -PluginVC::process_read_side(bool other_side_call) +PluginVC::process_read_side() { ink_assert(!deletable); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); - // TODO: Never used?? - // MIOBuffer *core_buffer; - - IOBufferReader *core_reader; - - if (vc_type == PLUGIN_VC_ACTIVE) { - // core_buffer = core_obj->p_to_a_buffer; - core_reader = core_obj->p_to_a_reader; - } else { - ink_assert(vc_type == PLUGIN_VC_PASSIVE); - // core_buffer = core_obj->a_to_p_buffer; - core_reader = core_obj->a_to_p_reader; - } - Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE); need_read_process = false; // Check read_state - if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) { - return; - } - - // Check the state of our read buffer as well as ntodo - int64_t ntodo = read_state.vio.ntodo(); - if (ntodo == 0) { - return; - } - - int64_t bytes_avail = core_reader->read_avail(); - int64_t act_on = std::min(bytes_avail, ntodo); - - Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on); - - if (act_on <= 0) { - if (other_side->closed || other_side->write_state.shutdown) { - read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio); - } - return; - } - // Bytes available, try to transfer from the PluginVCCore - // intermediate buffer - // - MIOBuffer *output_buffer = read_state.vio.get_writer(); - - int64_t water_mark = output_buffer->water_mark; - water_mark = std::max(water_mark, static_cast(core_obj->buffer_size)); - int64_t buf_space = water_mark - output_buffer->max_read_avail(); - if (buf_space <= 0) { - Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE); - return; - } - act_on = std::min(act_on, buf_space); - - int64_t added = transfer_bytes(output_buffer, core_reader, act_on); - if (added <= 0) { - // Couldn't actually get the buffer space. This only - // happens on small transfers with the above - // buffer_size factor doesn't apply - Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE); + if (read_state.vio.op != VIO::READ || closed || read_state.shutdown || !read_state.vio.ntodo()) { return; } - read_state.vio.ndone += added; - - Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added); - - if (read_state.vio.ntodo() == 0) { - read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio); - } else { - read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio); - } - - update_inactive_time(); - - // Wake up the other side so it knows there is space available in - // intermediate buffer if (!other_side->closed) { - if (!other_side_call) { - /* To clear the `need_write_process`, the mutexes must be obtained: - * - * - PluginVC::mutex - * - PluginVC::write_state.vio.mutex - * - */ - if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) { - // Just return, no touch on `other_side->need_write_process`. - return; - } - // Acquire the lock of the write side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id, - ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - - // set need_write_process to enforce the write processing - other_side->need_write_process = true; - other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); - return; - } + if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) { + // Just return, no touch on `other_side->need_write_process`. + return; + } + // Acquire the lock of the write side continuation + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - other_side->process_write_side(true); - } else { - other_side->write_state.vio.reenable(); + // set need_write_process to enforce the write processing + other_side->need_write_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; } + other_side->write_state.vio.reenable(); + other_side->process_write_side(); + } else { + Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE); + read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio); } } @@ -1078,14 +997,6 @@ PluginVCCore::init(int64_t buffer_index, int64_t buffer_water_mark) passive_vc.mutex = mutex; passive_vc.thread = active_vc.thread; - p_to_a_buffer = new_MIOBuffer(buffer_index); - p_to_a_buffer->water_mark = buffer_water_mark; - p_to_a_reader = p_to_a_buffer->alloc_reader(); - - a_to_p_buffer = new_MIOBuffer(buffer_index); - a_to_p_buffer->water_mark = buffer_water_mark; - a_to_p_reader = a_to_p_buffer->alloc_reader(); - buffer_size = BUFFER_SIZE_FOR_INDEX(buffer_index); Debug("pvc", @@ -1111,16 +1022,6 @@ PluginVCCore::destroy() passive_vc.write_state.vio.buffer.clear(); passive_vc.magic = PLUGIN_VC_MAGIC_DEAD; - if (p_to_a_buffer) { - free_MIOBuffer(p_to_a_buffer); - p_to_a_buffer = nullptr; - } - - if (a_to_p_buffer) { - free_MIOBuffer(a_to_p_buffer); - a_to_p_buffer = nullptr; - } - this->mutex = nullptr; delete this; } diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h index 81b484d9238..3f90678a659 100644 --- a/proxy/PluginVC.h +++ b/proxy/PluginVC.h @@ -150,8 +150,8 @@ class PluginVC : public NetVConnection, public PluginIdentity int main_handler(int event, void *data); private: - void process_read_side(bool); - void process_write_side(bool); + void process_read_side(); + void process_write_side(); void process_close(); void process_timeout(Event **e, int event_to_send); @@ -254,12 +254,6 @@ class PluginVCCore : public Continuation Continuation *connect_to = nullptr; bool connected = false; - MIOBuffer *p_to_a_buffer = nullptr; - IOBufferReader *p_to_a_reader = nullptr; - - MIOBuffer *a_to_p_buffer = nullptr; - IOBufferReader *a_to_p_reader = nullptr; - IpEndpoint passive_addr_struct; IpEndpoint active_addr_struct; From 3ee8f609c6236782b5dfdf4ecb832974879ef765 Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Mon, 28 Feb 2022 14:51:52 -0800 Subject: [PATCH 2/4] Update inactivity status for both sides --- proxy/PluginVC.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index 081d8be763c..a7176141f48 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -561,6 +561,7 @@ PluginVC::process_write_side() } update_inactive_time(); + other_side->update_inactive_time(); write_state.vio.reenable(); } @@ -588,7 +589,7 @@ PluginVC::process_read_side() return; } - if (!other_side->closed) { + if (!other_side->closed && !other_side->write_state.shutdown) { if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) { // Just return, no touch on `other_side->need_write_process`. return; @@ -606,7 +607,6 @@ PluginVC::process_read_side() other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); return; } - other_side->write_state.vio.reenable(); other_side->process_write_side(); } else { Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE); From 9087dcca4ddd21295a9b957c4883f232a0b35607 Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Thu, 17 Mar 2022 13:18:31 -0700 Subject: [PATCH 3/4] Accounts for read ntodo when writing, resolves issues when rw ntodo are not equal --- proxy/PluginVC.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index a7176141f48..7f0a5abf06c 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -507,6 +507,13 @@ PluginVC::process_write_side() return; } + // Check the state of the other side read buffer as well as ntodo + int64_t other_ntodo = other_side->read_state.vio.ntodo(); + if (other_ntodo == 0) { + return; + } + act_on = std::min(act_on, other_ntodo); + // Other side read_state is open // obtain the proper mutexes on other side EThread *my_ethread = mutex->thread_holding; @@ -525,7 +532,7 @@ PluginVC::process_write_side() // Bytes available, setting up other side read state writer MIOBuffer *output_buffer = other_side->read_state.vio.get_writer(); int64_t water_mark = output_buffer->water_mark; - water_mark = std::max(water_mark, static_cast(core_obj->buffer_size)); + water_mark = std::max(water_mark, core_obj->buffer_size); int64_t buf_space = water_mark - output_buffer->max_read_avail(); if (buf_space <= 0) { Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE); From 82e210bb17adb14dd1afbcbd485baea7236f668b Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Mon, 21 Mar 2022 16:56:28 -0700 Subject: [PATCH 4/4] Checks vc is closed before reenabled, prevent race cond --- proxy/PluginVC.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index 7f0a5abf06c..d491d4d5a0d 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -570,7 +570,9 @@ PluginVC::process_write_side() update_inactive_time(); other_side->update_inactive_time(); - write_state.vio.reenable(); + if (!closed) { + write_state.vio.reenable(); + } } // void PluginVC::process_read_side()