diff --git a/iocore/net/P_SSLNetVConnection.h b/iocore/net/P_SSLNetVConnection.h index 59733d32613..76b82840e5a 100644 --- a/iocore/net/P_SSLNetVConnection.h +++ b/iocore/net/P_SSLNetVConnection.h @@ -90,6 +90,7 @@ class SSLNetVConnection : public UnixNetVConnection public: int sslStartHandShake(int event, int &err) override; + void clear() override; void free(EThread *t) override; virtual void diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 9b1f6234f38..a2200eabdf4 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -206,6 +206,24 @@ class NetHandler : public Continuation void remove_from_active_queue(UnixNetVConnection *vc); void configure_per_thread(); + /** + Start to handle read & write event on a UnixNetVConnection. + Initial the socket fd of netvc for polling system. + Only be called when holding the mutex of this NetHandler. + + @param netvc UnixNetVConnection to be managed by this NetHandler. + @return 0 on success, -ERRNO on failure. + */ + int startIO(UnixNetVConnection *netvc); + /** + Stop to handle read & write event on a UnixNetVConnection. + Remove the socket fd of netvc from polling system. + Only be called when holding the mutex of this NetHandler. + + @param netvc UnixNetVConnection to be released. + */ + void stopIO(UnixNetVConnection *netvc); + NetHandler(); private: @@ -639,4 +657,48 @@ EventIO::stop() return 0; } +TS_INLINE int +NetHandler::startIO(UnixNetVConnection *netvc) +{ + ink_assert(this->mutex->thread_holding == this_ethread()); + ink_assert(netvc->thread == this_ethread()); + int res = 0; + + PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread); + if (netvc->ep.start(pd, netvc, EVENTIO_READ | EVENTIO_WRITE) < 0) { + res = errno; + // EEXIST should be ok, though it should have been cleared before we got back here + if (errno != EEXIST) { + Debug("iocore_net", "NetHandler::startIO : failed on EventIO::start, errno = [%d](%s)", errno, strerror(errno)); + return -res; + } + } + + if (netvc->read.triggered == 1) { + read_ready_list.enqueue(netvc); + } + netvc->nh = this; + return res; +} + +TS_INLINE void +NetHandler::stopIO(UnixNetVConnection *netvc) +{ + ink_release_assert(netvc->nh == this); + + netvc->ep.stop(); + + read_ready_list.remove(netvc); + write_ready_list.remove(netvc); + if (netvc->read.in_enabled_list) { + read_enable_list.remove(netvc); + netvc->read.in_enabled_list = 0; + } + if (netvc->write.in_enabled_list) { + write_enable_list.remove(netvc); + netvc->write.in_enabled_list = 0; + } + + netvc->nh = nullptr; +} #endif diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h index c86a136ef16..72e86a8afbd 100644 --- a/iocore/net/P_UnixNetVConnection.h +++ b/iocore/net/P_UnixNetVConnection.h @@ -276,6 +276,7 @@ class UnixNetVConnection : public NetVConnection * This is logic is invoked when the NetVC object is created in a new thread context */ virtual int populate(Connection &con, Continuation *c, void *arg); + virtual void clear(); virtual void free(EThread *t); ink_hrtime get_inactivity_timeout() override; diff --git a/iocore/net/SSLNetVConnection.cc b/iocore/net/SSLNetVConnection.cc index 252b0c7c073..dda8d471025 100644 --- a/iocore/net/SSLNetVConnection.cc +++ b/iocore/net/SSLNetVConnection.cc @@ -847,40 +847,8 @@ SSLNetVConnection::do_io_close(int lerrno) } void -SSLNetVConnection::free(EThread *t) +SSLNetVConnection::clear() { - got_remote_addr = false; - got_local_addr = false; - read.vio.mutex.clear(); - write.vio.mutex.clear(); - this->mutex.clear(); - action_.mutex.clear(); - this->ep.stop(); - this->con.close(); - flags = 0; - - SET_CONTINUATION_HANDLER(this, (SSLNetVConnHandler)&SSLNetVConnection::startEvent); - - if (nh) { - nh->read_ready_list.remove(this); - nh->write_ready_list.remove(this); - nh = nullptr; - } - - read.triggered = 0; - write.triggered = 0; - read.enabled = 0; - write.enabled = 0; - read.vio._cont = nullptr; - write.vio._cont = nullptr; - read.vio.vc_server = nullptr; - write.vio.vc_server = nullptr; - - closed = 0; - options.reset(); - - ink_assert(con.fd == NO_FD); - if (ssl != nullptr) { SSL_free(ssl); ssl = nullptr; @@ -901,6 +869,21 @@ SSLNetVConnection::free(EThread *t) free_handshake_buffers(); sslTrace = false; + super::clear(); +} +void +SSLNetVConnection::free(EThread *t) +{ + ink_release_assert(t == this_ethread()); + + // close socket fd + con.close(); + + clear(); + SET_CONTINUATION_HANDLER(this, (SSLNetVConnHandler)&SSLNetVConnection::startEvent); + ink_assert(con.fd == NO_FD); + ink_assert(t == this_ethread()); + if (from_accept_thread) { sslNetVCAllocator.free(this); } else { diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc index d14da0d130d..9c7da81c7a6 100644 --- a/iocore/net/UnixNetAccept.cc +++ b/iocore/net/UnixNetAccept.cc @@ -92,6 +92,12 @@ net_accept(NetAccept *na, void *ep, bool blockable) vc->action_ = *na->action_; vc->set_is_transparent(na->opt.f_inbound_transparent); vc->set_context(NET_VCONNECTION_IN); +#ifdef USE_EDGE_TRIGGER + // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. + if (na->server.http_accept_filter) { + vc->read.triggered = 1; + } +#endif SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent); if (e->ethread->is_event_type(na->opt.etype)) { @@ -292,6 +298,12 @@ NetAccept::do_blocking_accept(EThread *t) vc->apply_options(); vc->set_context(NET_VCONNECTION_IN); vc->accept_object = this; +#ifdef USE_EDGE_TRIGGER + // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. + if (server.http_accept_filter) { + vc->read.triggered = 1; + } +#endif SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent); // eventProcessor.schedule_imm(vc, getEtype()); eventProcessor.schedule_imm_signal(vc, opt.etype); @@ -440,6 +452,12 @@ NetAccept::acceptFastEvent(int event, void *ep) vc->apply_options(); vc->set_context(NET_VCONNECTION_IN); vc->action_ = *action_; +#ifdef USE_EDGE_TRIGGER + // Set the vc as triggered and place it in the read ready queue later in case there is already data on the socket. + if (server.http_accept_filter) { + vc->read.triggered = 1; + } +#endif SET_CONTINUATION_HANDLER(vc, (NetVConnHandler)&UnixNetVConnection::acceptEvent); // We must be holding the lock already to do later do_io_read's SCOPED_MUTEX_LOCK(lock, vc->mutex, e->ethread); diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index 8ed2eb0695e..a47ee316367 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -83,10 +83,10 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t) } NetHandler *nh = vc->nh; vc->cancel_OOB(); - vc->ep.stop(); ink_release_assert(vc->thread == t); + // 1. Cancel timeout vc->next_inactivity_timeout_at = 0; vc->next_activity_timeout_at = 0; @@ -94,21 +94,15 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t) vc->active_timeout_in = 0; if (nh) { + // 2. Release vc from InactivityCop. nh->open_list.remove(vc); nh->cop_list.remove(vc); - nh->read_ready_list.remove(vc); - nh->write_ready_list.remove(vc); - if (vc->read.in_enabled_list) { - nh->read_enable_list.remove(vc); - vc->read.in_enabled_list = 0; - } - if (vc->write.in_enabled_list) { - nh->write_enable_list.remove(vc); - vc->write.in_enabled_list = 0; - } vc->remove_from_keep_alive_queue(); vc->remove_from_active_queue(); + // 3. Release vc from NetHandler. + nh->stopIO(vc); } + // 4. Clear then deallocate vc. vc->free(t); } @@ -1118,9 +1112,10 @@ UnixNetVConnection::startEvent(int /* event ATS_UNUSED */, Event *e) int UnixNetVConnection::acceptEvent(int event, Event *e) { - EThread *t = (e == nullptr) ? this_ethread() : e->ethread; + EThread *t = (e == nullptr) ? this_ethread() : e->ethread; + NetHandler *h = get_NetHandler(t); - MUTEX_TRY_LOCK(lock, get_NetHandler(t)->mutex, t); + MUTEX_TRY_LOCK(lock, h->mutex, t); if (!lock.is_locked()) { if (event == EVENT_NONE) { t->schedule_in(this, HRTIME_MSECONDS(net_retry_delay)); @@ -1138,28 +1133,22 @@ UnixNetVConnection::acceptEvent(int event, Event *e) return EVENT_DONE; } - SET_HANDLER((NetVConnHandler)&UnixNetVConnection::mainEvent); - - PollDescriptor *pd = get_PollDescriptor(thread); - if (ep.start(pd, this, EVENTIO_READ | EVENTIO_WRITE) < 0) { - Debug("iocore_net", "acceptEvent : failed EventIO::start"); + // Send this NetVC to NetHandler and start to polling read & write event. + if (h->startIO(this) < 0) { free(t); return EVENT_DONE; } - nh = get_NetHandler(thread); - set_inactivity_timeout(0); - nh->open_list.enqueue(this); + // Setup a timeout callback handler. + SET_HANDLER((NetVConnHandler)&UnixNetVConnection::mainEvent); -#ifdef USE_EDGE_TRIGGER - // Set the vc as triggered and place it in the read ready queue in case there is already data on the socket. - Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue"); - read.triggered = 1; - nh->read_ready_list.enqueue(this); -#endif + // All NetVCs in the open_list is checked for timeout by InactivityCop. + nh->open_list.enqueue(this); if (inactivity_timeout_in) { UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in); + } else { + set_inactivity_timeout(0); } if (active_timeout_in) { @@ -1248,25 +1237,22 @@ UnixNetVConnection::populate(Connection &con_in, Continuation *c, void *arg) this->mutex = c->mutex; this->thread = this_ethread(); - EThread *t = this_ethread(); - if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ | EVENTIO_WRITE) < 0) { - // EEXIST should be ok, though it should have been cleared before we got back here - if (errno != EEXIST) { - Debug("iocore_net", "populate : Failed to add to epoll list"); - return EVENT_ERROR; - } - } - - SET_HANDLER(&UnixNetVConnection::mainEvent); + EThread *t = this_ethread(); + NetHandler *h = get_NetHandler(t); - this->nh = get_NetHandler(t); - ink_assert(this->nh != nullptr); - MUTEX_TRY_LOCK(lock, this->nh->mutex, t); + MUTEX_TRY_LOCK(lock, h->mutex, t); if (!lock.is_locked()) { // Clean up and go home return EVENT_ERROR; } - ink_assert(nh->mutex->thread_holding == this_ethread()); + + if (h->startIO(this) < 0) { + Debug("iocore_net", "populate : Failed to add to epoll list"); + return EVENT_ERROR; + } + + ink_assert(this->nh != nullptr); + SET_HANDLER(&UnixNetVConnection::mainEvent); ink_assert(!nh->open_list.in(this)); this->nh->open_list.enqueue(this); ink_assert(this->con.fd != NO_FD); @@ -1276,14 +1262,14 @@ UnixNetVConnection::populate(Connection &con_in, Continuation *c, void *arg) int UnixNetVConnection::connectUp(EThread *t, int fd) { + ink_assert(get_NetHandler(t)->mutex->thread_holding == this_ethread()); int res; thread = t; if (check_net_throttle(CONNECT, submit_time)) { check_throttle_warning(); - action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)-ENET_THROTTLING); - free(t); - return CONNECT_FAILURE; + res = -ENET_THROTTLING; + goto fail; } // Force family to agree with remote (server) address. @@ -1334,46 +1320,40 @@ UnixNetVConnection::connectUp(EThread *t, int fd) // Must connect after EventIO::Start() to avoid a race condition // when edge triggering is used. - if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ | EVENTIO_WRITE) < 0) { - res = -errno; - Debug("iocore_net", "connectUp : Failed to add to epoll list : %s", strerror(errno)); + if ((res = get_NetHandler(t)->startIO(this)) < 0) { goto fail; } if (fd == NO_FD) { res = con.connect(nullptr, options); if (res != 0) { + // fast stopIO + nh = nullptr; goto fail; } } - // start up next round immediately - + // Setup a timeout callback handler. SET_HANDLER(&UnixNetVConnection::mainEvent); - - nh = get_NetHandler(t); - set_inactivity_timeout(0); + // All NetVCs in the open_list is checked for timeout by InactivityCop. nh->open_list.enqueue(this); + set_inactivity_timeout(0); ink_assert(!active_timeout_in); this->set_local_addr(); action_.continuation->handleEvent(NET_EVENT_OPEN, this); return CONNECT_SUCCESS; fail: - lerrno = errno; + lerrno = -res; action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)(intptr_t)res); free(t); return CONNECT_FAILURE; } void -UnixNetVConnection::free(EThread *t) +UnixNetVConnection::clear() { - ink_release_assert(t == this_ethread()); - - // close socket fd - con.close(); // clear variables for reuse this->mutex.clear(); action_.mutex.clear(); @@ -1382,8 +1362,7 @@ UnixNetVConnection::free(EThread *t) attributes = 0; read.vio.mutex.clear(); write.vio.mutex.clear(); - flags = 0; - SET_CONTINUATION_HANDLER(this, (NetVConnHandler)&UnixNetVConnection::startEvent); + flags = 0; nh = nullptr; read.triggered = 0; write.triggered = 0; @@ -1401,6 +1380,18 @@ UnixNetVConnection::free(EThread *t) ink_assert(!write.ready_link.prev && !write.ready_link.next); ink_assert(!write.enable_link.next); ink_assert(!link.next && !link.prev); +} + +void +UnixNetVConnection::free(EThread *t) +{ + ink_release_assert(t == this_ethread()); + + // close socket fd + con.close(); + + clear(); + SET_CONTINUATION_HANDLER(this, (NetVConnHandler)&UnixNetVConnection::startEvent); ink_assert(con.fd == NO_FD); ink_assert(t == this_ethread());