diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index e9c6948ae56..c0f63381b20 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -162,6 +162,58 @@ struct PollCont : public Continuation { int pollEvent(int event, Event *e); }; +/** + NetHandler is the processor of NetVC for the Net sub-system. The NetHandler + is the core component of the Net sub-system. Once started, it is responsible + for polling socket fds and perform the I/O tasks in NetVC. + + The NetHandler is executed periodically to perform read/write tasks for + NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of + EThread::execute() loop. This is the reason that Net System is a sub-system. + + By get_NetHandler(this_ethread()), you can get the NetHandler object that + runs inside the current EThread and then @c startIO / @c stopIO which + assign/release a NetVC to/from NetHandler. Before you call these functions, + holding the mutex of this NetHandler is required. + + The NetVConnection provides a set of do_io functions through which you can + specify continuations to be called back by its NetHandler. These function + calls do not block. Instead they return an VIO object and schedule the + callback to the continuation passed in when there are I/O events occurred. + + Multi-thread scheduler: + + The NetHandler should be viewed as multi-threaded schedulers which process + NetVCs from their queues. The NetVC can be made of NetProcessor (allocate_vc) + either by directly adding a NetVC to the queue (NetHandler::startIO), or more + conveniently, calling a method service call (NetProcessor::connect_re) which + synthesizes the NetVC and places it in the queue. + + Callback event codes: + + These event codes for do_io_read and reenable(read VIO) task: + VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE, + VC_EVENT_EOS, VC_EVENT_ERROR + + These event codes for do_io_write and reenable(write VIO) task: + VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE + VC_EVENT_ERROR + + There is no event and callback for do_io_shutdown / do_io_close task. + + NetVConnection allocation policy: + + NetVCs are allocated by the NetProcessor and deallocated by NetHandler. + A state machine may access the returned, non-recurring NetVC / VIO until + it is closed by do_io_close. For recurring NetVC, the NetVC may be + accessed until it is closed. Once the NetVC is closed, it's the + NetHandler's responsibility to deallocate it. + + Before assign to NetHandler or after release from NetHandler, it's the + NetVC's responsibility to deallocate itself. + + */ + // // NetHandler // @@ -234,7 +286,7 @@ class NetHandler : public Continuation @param netvc UnixNetVConnection to be managed by InactivityCop */ void startCop(UnixNetVConnection *netvc); - /* * + /** Stop to handle active timeout and inactivity on a UnixNetVConnection. Remove the netvc from open_list and cop_list. Also remove the netvc from keep_alive_queue and active_queue if its context is IN. @@ -244,6 +296,13 @@ class NetHandler : public Continuation */ void stopCop(UnixNetVConnection *netvc); + /** + Release a netvc and free it. + + @param netvc UnixNetVConnection to be deattached. + */ + void free_netvc(UnixNetVConnection *netvc); + NetHandler(); private: diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index 513bfd8b7a9..ee6bc055290 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -430,7 +430,6 @@ UnixNetVConnection::set_action(Continuation *c) // declarations for local use (within the net module) -void close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t); void write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread); void write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread); diff --git a/iocore/net/SSLNetVConnection.cc b/iocore/net/SSLNetVConnection.cc index 025c0b6bfff..7055c1f4f7a 100644 --- a/iocore/net/SSLNetVConnection.cc +++ b/iocore/net/SSLNetVConnection.cc @@ -876,7 +876,12 @@ SSLNetVConnection::free(EThread *t) { ink_release_assert(t == this_ethread()); + // cancel OOB + cancel_OOB(); // close socket fd + if (con.fd != NO_FD) { + NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1); + } con.close(); clear(); diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index 4c4634277fe..631b4b297bd 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -60,7 +60,7 @@ class InactivityCop : public Continuation } if (vc->closed) { - close_UnixNetVConnection(vc, e->ethread); + nh.free_netvc(vc); continue; } @@ -305,6 +305,25 @@ update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecDat return REC_ERR_OKAY; } +// +// Function used to release a UnixNetVConnection and free it. +// +void +NetHandler::free_netvc(UnixNetVConnection *netvc) +{ + EThread *t = trigger_event->ethread; + + ink_assert(t == this_ethread()); + ink_release_assert(netvc->thread == t); + ink_release_assert(netvc->nh == this); + + // Release netvc from InactivityCop + stopCop(netvc); + // Release netvc from NetHandler + stopIO(netvc); + // Clear and deallocate netvc + netvc->free(t); +} // // Initialization here, in the thread in which we will be executing @@ -387,7 +406,7 @@ NetHandler::process_ready_list() // Initialize the thread-local continuation flags set_cont_flags(vc->control_flags); if (vc->closed) - close_UnixNetVConnection(vc, trigger_event->ethread); + free_netvc(vc); else if (vc->read.enabled && vc->read.triggered) vc->net_read_io(this, trigger_event->ethread); else if (!vc->read.enabled) { @@ -404,7 +423,7 @@ NetHandler::process_ready_list() while ((vc = write_ready_list.dequeue())) { set_cont_flags(vc->control_flags); if (vc->closed) - close_UnixNetVConnection(vc, trigger_event->ethread); + free_netvc(vc); else if (vc->write.enabled && vc->write.triggered) write_to_net(this, vc, trigger_event->ethread); else if (!vc->write.enabled) { @@ -422,7 +441,7 @@ NetHandler::process_ready_list() while ((vc = read_ready_list.dequeue())) { diags->set_override(vc->control.debug_override); if (vc->closed) - close_UnixNetVConnection(vc, trigger_event->ethread); + free_netvc(vc); else if (vc->read.enabled && vc->read.triggered) vc->net_read_io(this, trigger_event->ethread); else if (!vc->read.enabled) @@ -431,7 +450,7 @@ NetHandler::process_ready_list() while ((vc = write_ready_list.dequeue())) { diags->set_override(vc->control.debug_override); if (vc->closed) - close_UnixNetVConnection(vc, trigger_event->ethread); + free_netvc(vc); else if (vc->write.enabled && vc->write.triggered) write_to_net(this, vc, trigger_event->ethread); else if (!vc->write.enabled) @@ -624,7 +643,7 @@ NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at), ink_hrtime_to_sec(vc->inactivity_timeout_in), diff); if (vc->closed) { - close_UnixNetVConnection(vc, this_ethread()); + free_netvc(vc); ++closed; } else { vc->next_inactivity_timeout_at = now; diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index 808fb5336b3..17e0e399024 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -72,37 +72,6 @@ net_activity(UnixNetVConnection *vc, EThread *thread) } } -// -// Function used to close a UnixNetVConnection and free the vc -// -void -close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t) -{ - if (vc->con.fd != NO_FD) { - NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1); - } - NetHandler *nh = vc->nh; - vc->cancel_OOB(); - - ink_release_assert(vc->thread == t); - - // 1. Cancel timeout - vc->next_inactivity_timeout_at = 0; - vc->next_activity_timeout_at = 0; - - vc->inactivity_timeout_in = 0; - vc->active_timeout_in = 0; - - if (nh) { - // 2. Release vc from InactivityCop. - nh->stopCop(vc); - // 3. Release vc from NetHandler. - nh->stopIO(vc); - } - // 4. Clear then deallocate vc. - vc->free(t); -} - // // Signal an event // @@ -130,7 +99,7 @@ read_signal_and_update(int event, UnixNetVConnection *vc) if (!--vc->recursion && vc->closed) { /* BZ 31932 */ ink_assert(vc->thread == this_ethread()); - close_UnixNetVConnection(vc, vc->thread); + vc->nh->free_netvc(vc); return EVENT_DONE; } else { return EVENT_CONT; @@ -161,7 +130,7 @@ write_signal_and_update(int event, UnixNetVConnection *vc) if (!--vc->recursion && vc->closed) { /* BZ 31932 */ ink_assert(vc->thread == this_ethread()); - close_UnixNetVConnection(vc, vc->thread); + vc->nh->free_netvc(vc); return EVENT_DONE; } else { return EVENT_CONT; @@ -228,7 +197,7 @@ read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread) // global session pool case. If so, the closed flag should be stable once we get the // s->vio.mutex (the global session pool mutex). if (vc->closed) { - close_UnixNetVConnection(vc, thread); + vc->nh->free_netvc(vc); return; } // if it is not enabled. @@ -669,6 +638,9 @@ UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader void UnixNetVConnection::do_io_close(int alerrno /* = -1 */) { + // FIXME: the nh must not nullptr. + ink_assert(nh); + read.enabled = 0; write.enabled = 0; read.vio.buffer.clear(); @@ -694,7 +666,11 @@ UnixNetVConnection::do_io_close(int alerrno /* = -1 */) } if (close_inline) { - close_UnixNetVConnection(this, t); + if (nh) { + nh->free_netvc(this); + } else { + free(t); + } } } @@ -1207,7 +1183,7 @@ UnixNetVConnection::mainEvent(int event, Event *e) writer_cont = write.vio._cont; if (closed) { - close_UnixNetVConnection(this, thread); + nh->free_netvc(this); return EVENT_DONE; } @@ -1304,11 +1280,6 @@ UnixNetVConnection::connectUp(EThread *t, int fd) } if (check_emergency_throttle(con)) { - // The `con' could be closed if there is hyper emergency - if (fd == NO_FD) { - // We need to decrement the stat because close_UnixNetVConnection only decrements with a valid connection descriptor. - NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1); - } // Set errno force to EMFILE (reached limit for open file descriptors) errno = EMFILE; res = -errno; @@ -1354,6 +1325,12 @@ UnixNetVConnection::connectUp(EThread *t, int fd) void UnixNetVConnection::clear() { + // clear timeout variables + next_inactivity_timeout_at = 0; + next_activity_timeout_at = 0; + inactivity_timeout_in = 0; + active_timeout_in = 0; + // clear variables for reuse this->mutex.clear(); action_.mutex.clear(); @@ -1387,7 +1364,12 @@ UnixNetVConnection::free(EThread *t) { ink_release_assert(t == this_ethread()); + // cancel OOB + cancel_OOB(); // close socket fd + if (con.fd != NO_FD) { + NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1); + } con.close(); clear();