diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index e64d59f686b..d491d4d5a0d 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -217,11 +217,11 @@ PluginVC::main_handler(int event, void *data) } if (need_read_process) { - process_read_side(false); + process_read_side(); } if (need_write_process && !closed) { - process_write_side(false); + process_write_side(); } } @@ -343,11 +343,11 @@ PluginVC::reenable_re(VIO *vio) if (vio->op == VIO::WRITE) { ink_assert(vio == &write_state.vio); need_write_process = true; - process_write_side(false); + process_write_side(); } else if (vio->op == VIO::READ) { ink_assert(vio == &read_state.vio); need_read_process = true; - process_read_side(false); + process_read_side(); } else { ink_release_assert(0); } @@ -458,22 +458,20 @@ PluginVC::transfer_bytes(MIOBuffer *transfer_to, IOBufferReader *transfer_from, return total_added; } -// void PluginVC::process_write_side(bool cb_ok) +// void PluginVC::process_write_side() // // This function may only be called while holding // this->mutex & while it is ok to callback the // write side continuation // -// Does write side processing +// Does write side processing directly to other read side writer // void -PluginVC::process_write_side(bool other_side_call) +PluginVC::process_write_side() { ink_assert(!deletable); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); - MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer; - Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE); need_write_process = false; @@ -494,7 +492,8 @@ PluginVC::process_write_side(bool other_side_call) Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on); - if (other_side->closed || other_side->read_state.shutdown) { + // Check read_state of other side + if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) { write_state.vio.cont->handleEvent(VC_EVENT_ERROR, &write_state.vio); return; } @@ -507,28 +506,54 @@ PluginVC::process_write_side(bool other_side_call) } return; } - // Bytes available, try to transfer to the PluginVCCore - // intermediate buffer - // - int64_t buf_space = core_obj->buffer_size - core_buffer->max_read_avail(); + + // Check the state of the other side read buffer as well as ntodo + int64_t other_ntodo = other_side->read_state.vio.ntodo(); + if (other_ntodo == 0) { + return; + } + act_on = std::min(act_on, other_ntodo); + + // Other side read_state is open + // obtain the proper mutexes on other side + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); + + // set need_read_process to enforce the read processing + other_side->need_read_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; + } + + // Bytes available, setting up other side read state writer + MIOBuffer *output_buffer = other_side->read_state.vio.get_writer(); + int64_t water_mark = output_buffer->water_mark; + water_mark = std::max(water_mark, core_obj->buffer_size); + int64_t buf_space = water_mark - output_buffer->max_read_avail(); if (buf_space <= 0) { - Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE); + Debug("pvc", "[%u] %s: process_read_side from other side no buffer space", core_obj->id, PVC_TYPE); return; } act_on = std::min(act_on, buf_space); - int64_t added = transfer_bytes(core_buffer, reader, act_on); + int64_t added = transfer_bytes(output_buffer, reader, act_on); if (added < 0) { // Couldn't actually get the buffer space. This only // happens on small transfers with the above // buffer_size factor doesn't apply - Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE); + Debug("pvc", "[%u] %s: process_read_side from other side out of buffer space", core_obj->id, PVC_TYPE); return; } write_state.vio.ndone += added; + other_side->read_state.vio.ndone += added; - Debug("pvc", "[%u] %s: process_write_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added); + Debug("pvc", "[%u] %s: process_write_side and process_read_side from other side; added %" PRId64 "", core_obj->id, PVC_TYPE, + added); if (write_state.vio.ntodo() == 0) { write_state.vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio); @@ -536,39 +561,17 @@ PluginVC::process_write_side(bool other_side_call) write_state.vio.cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio); } - update_inactive_time(); + if (other_side->read_state.vio.ntodo() == 0) { + other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &other_side->read_state.vio); + } else { + other_side->read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &other_side->read_state.vio); + } - // Wake up the read side on the other side to process these bytes - if (!other_side->closed) { - if (!other_side_call) { - /* To clear the `need_read_process`, the mutexes must be obtained: - * - * - PluginVC::mutex - * - PluginVC::read_state.vio.mutex - * - */ - if (other_side->read_state.vio.op != VIO::READ || other_side->closed || other_side->read_state.shutdown) { - // Just return, no touch on `other_side->need_read_process`. - return; - } - // Acquire the lock of the read side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_read_side from other side lock miss, retrying", other_side->core_obj->id, - ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - - // set need_read_process to enforce the read processing - other_side->need_read_process = true; - other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); - return; - } + update_inactive_time(); + other_side->update_inactive_time(); - other_side->process_read_side(true); - } else { - other_side->read_state.vio.reenable(); - } + if (!closed) { + write_state.vio.reenable(); } } @@ -578,120 +581,45 @@ PluginVC::process_write_side(bool other_side_call) // this->mutex & while it is ok to callback the // read side continuation // -// Does read side processing +// Closes read state if other side +// write state is no longer available // void -PluginVC::process_read_side(bool other_side_call) +PluginVC::process_read_side() { ink_assert(!deletable); ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE); - // TODO: Never used?? - // MIOBuffer *core_buffer; - - IOBufferReader *core_reader; - - if (vc_type == PLUGIN_VC_ACTIVE) { - // core_buffer = core_obj->p_to_a_buffer; - core_reader = core_obj->p_to_a_reader; - } else { - ink_assert(vc_type == PLUGIN_VC_PASSIVE); - // core_buffer = core_obj->a_to_p_buffer; - core_reader = core_obj->a_to_p_reader; - } - Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE); need_read_process = false; // Check read_state - if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) { - return; - } - - // Check the state of our read buffer as well as ntodo - int64_t ntodo = read_state.vio.ntodo(); - if (ntodo == 0) { + if (read_state.vio.op != VIO::READ || closed || read_state.shutdown || !read_state.vio.ntodo()) { return; } - int64_t bytes_avail = core_reader->read_avail(); - int64_t act_on = std::min(bytes_avail, ntodo); - - Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64 "", core_obj->id, PVC_TYPE, act_on); - - if (act_on <= 0) { - if (other_side->closed || other_side->write_state.shutdown) { - read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio); + if (!other_side->closed && !other_side->write_state.shutdown) { + if (other_side->write_state.vio.op != VIO::WRITE || other_side->write_state.shutdown) { + // Just return, no touch on `other_side->need_write_process`. + return; } - return; - } - // Bytes available, try to transfer from the PluginVCCore - // intermediate buffer - // - MIOBuffer *output_buffer = read_state.vio.get_writer(); - - int64_t water_mark = output_buffer->water_mark; - water_mark = std::max(water_mark, static_cast(core_obj->buffer_size)); - int64_t buf_space = water_mark - output_buffer->max_read_avail(); - if (buf_space <= 0) { - Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE); - return; - } - act_on = std::min(act_on, buf_space); - - int64_t added = transfer_bytes(output_buffer, core_reader, act_on); - if (added <= 0) { - // Couldn't actually get the buffer space. This only - // happens on small transfers with the above - // buffer_size factor doesn't apply - Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE); - return; - } - - read_state.vio.ndone += added; - - Debug("pvc", "[%u] %s: process_read_side; added %" PRId64 "", core_obj->id, PVC_TYPE, added); - - if (read_state.vio.ntodo() == 0) { - read_state.vio.cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio); - } else { - read_state.vio.cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio); - } - - update_inactive_time(); - - // Wake up the other side so it knows there is space available in - // intermediate buffer - if (!other_side->closed) { - if (!other_side_call) { - /* To clear the `need_write_process`, the mutexes must be obtained: - * - * - PluginVC::mutex - * - PluginVC::write_state.vio.mutex - * - */ - if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed || other_side->write_state.shutdown) { - // Just return, no touch on `other_side->need_write_process`. - return; - } - // Acquire the lock of the write side continuation - EThread *my_ethread = mutex->thread_holding; - ink_assert(my_ethread != nullptr); - MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread); - if (!lock.is_locked()) { - Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id, - ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - - // set need_write_process to enforce the write processing - other_side->need_write_process = true; - other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); - return; - } + // Acquire the lock of the write side continuation + EThread *my_ethread = mutex->thread_holding; + ink_assert(my_ethread != nullptr); + MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread); + if (!lock.is_locked()) { + Debug("pvc_event", "[%u] %s: process_write_side from other side lock miss, retrying", other_side->core_obj->id, + ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")); - other_side->process_write_side(true); - } else { - other_side->write_state.vio.reenable(); + // set need_write_process to enforce the write processing + other_side->need_write_process = true; + other_side->setup_event_cb(PVC_LOCK_RETRY_TIME, &other_side->core_lock_retry_event); + return; } + other_side->process_write_side(); + } else { + Debug("pvc", "[%u] %s: write_state of other side is not available", core_obj->id, PVC_TYPE); + read_state.vio.cont->handleEvent(VC_EVENT_EOS, &read_state.vio); } } @@ -1078,14 +1006,6 @@ PluginVCCore::init(int64_t buffer_index, int64_t buffer_water_mark) passive_vc.mutex = mutex; passive_vc.thread = active_vc.thread; - p_to_a_buffer = new_MIOBuffer(buffer_index); - p_to_a_buffer->water_mark = buffer_water_mark; - p_to_a_reader = p_to_a_buffer->alloc_reader(); - - a_to_p_buffer = new_MIOBuffer(buffer_index); - a_to_p_buffer->water_mark = buffer_water_mark; - a_to_p_reader = a_to_p_buffer->alloc_reader(); - buffer_size = BUFFER_SIZE_FOR_INDEX(buffer_index); Debug("pvc", @@ -1111,16 +1031,6 @@ PluginVCCore::destroy() passive_vc.write_state.vio.buffer.clear(); passive_vc.magic = PLUGIN_VC_MAGIC_DEAD; - if (p_to_a_buffer) { - free_MIOBuffer(p_to_a_buffer); - p_to_a_buffer = nullptr; - } - - if (a_to_p_buffer) { - free_MIOBuffer(a_to_p_buffer); - a_to_p_buffer = nullptr; - } - this->mutex = nullptr; delete this; } diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h index 81b484d9238..3f90678a659 100644 --- a/proxy/PluginVC.h +++ b/proxy/PluginVC.h @@ -150,8 +150,8 @@ class PluginVC : public NetVConnection, public PluginIdentity int main_handler(int event, void *data); private: - void process_read_side(bool); - void process_write_side(bool); + void process_read_side(); + void process_write_side(); void process_close(); void process_timeout(Event **e, int event_to_send); @@ -254,12 +254,6 @@ class PluginVCCore : public Continuation Continuation *connect_to = nullptr; bool connected = false; - MIOBuffer *p_to_a_buffer = nullptr; - IOBufferReader *p_to_a_reader = nullptr; - - MIOBuffer *a_to_p_buffer = nullptr; - IOBufferReader *a_to_p_reader = nullptr; - IpEndpoint passive_addr_struct; IpEndpoint active_addr_struct;