diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index 0045f107711..ae0003ec309 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -345,9 +345,9 @@ Vol::aggWriteDone(int event, Event *e) // callback ready sync CacheVCs CacheVC *c = nullptr; while ((c = sync.dequeue())) { - if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else { + if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) { + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + } else { sync.push(c); // put it back on the front break; } @@ -987,10 +987,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) ink_assert(false); while ((c = agg.dequeue())) { agg_todo_size -= c->agg_len; - if (c->initial_thread != nullptr) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } @@ -1047,12 +1044,11 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) Lwait: int ret = EVENT_CONT; while ((c = tocall.dequeue())) { - if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) + if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) { ret = EVENT_RETURN; - else if (c->initial_thread != nullptr) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else + } else { eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + } } return ret; } diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h index e70460b107b..ce2565c9f81 100644 --- a/iocore/cache/P_CacheInternal.h +++ b/iocore/cache/P_CacheInternal.h @@ -449,7 +449,6 @@ struct CacheVC : public CacheVConnection { // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the // size_to_init initialization VIO vio; - EThread *initial_thread; // initial thread open_XX was called on CacheFragType frag_type; CacheHTTPInfo *info; CacheHTTPInfoVector *write_vector; @@ -566,9 +565,9 @@ new_CacheVC(Continuation *cont) c->vector.data.data = &c->vector.data.fast_data[0]; #endif c->_action = cont; - c->initial_thread = t->tt == DEDICATED ? nullptr : t; c->mutex = cont->mutex; c->start_time = Thread::get_hrtime(); + c->setThreadAffinity(t); ink_assert(c->trigger == nullptr); Debug("cache_new", "new %p", c); #ifdef CACHE_STAT_PAGES diff --git a/iocore/eventsystem/I_Continuation.h b/iocore/eventsystem/I_Continuation.h index 496fbe46ebe..b9f0cc81517 100644 --- a/iocore/eventsystem/I_Continuation.h +++ b/iocore/eventsystem/I_Continuation.h @@ -46,6 +46,9 @@ class Processor; class ProxyMutex; class EThread; +extern EThread *this_ethread(); +extern EThread *this_event_thread(); + ////////////////////////////////////////////////////////////////////////////// // // Constants and Type Definitions @@ -133,6 +136,30 @@ class Continuation : private force_VFPT_to_top */ ContFlags control_flags; + EThread *thread_affinity = this_event_thread(); + + bool + setThreadAffinity(EThread *ethread) + { + if (ethread != nullptr) { + thread_affinity = ethread; + return true; + } + return false; + } + + EThread * + getThreadAffinity() + { + return thread_affinity; + } + + void + clearThreadAffinity() + { + thread_affinity = nullptr; + } + /** Receives the event code and data for an Event. diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h index 1c46fa2f920..4facf36acba 100644 --- a/iocore/eventsystem/I_EThread.h +++ b/iocore/eventsystem/I_EThread.h @@ -329,6 +329,8 @@ class EThread : public Thread bool is_event_type(EventType et); void set_event_type(EventType et); + bool has_event_loop = false; + // Private Interface void execute(); diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h index 45551a1a87b..de32f6b69f3 100644 --- a/iocore/eventsystem/P_UnixEThread.h +++ b/iocore/eventsystem/P_UnixEThread.h @@ -169,6 +169,17 @@ this_ethread() return (EThread *)this_thread(); } +TS_INLINE EThread * +this_event_thread() +{ + EThread *ethread = this_ethread(); + if (ethread != nullptr && ethread->has_event_loop) { + return ethread; + } else { + return nullptr; + } +} + TS_INLINE void EThread::free_event(Event *e) { diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h index 8b9a04aa81f..73c0f0736ed 100644 --- a/iocore/eventsystem/P_UnixEventProcessor.h +++ b/iocore/eventsystem/P_UnixEventProcessor.h @@ -71,7 +71,14 @@ TS_INLINE Event * EventProcessor::schedule(Event *e, EventType etype, bool fast_signal) { ink_assert(etype < MAX_EVENT_TYPES); - e->ethread = assign_thread(etype); + + EThread *ethread = e->continuation->getThreadAffinity(); + if (ethread != nullptr && ethread->is_event_type(etype)) { + e->ethread = ethread; + } else { + e->ethread = assign_thread(etype); + } + if (e->continuation->mutex) e->mutex = e->continuation->mutex; else diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc index 1f2496d9beb..b0b79dc5487 100644 --- a/iocore/eventsystem/UnixEThread.cc +++ b/iocore/eventsystem/UnixEThread.cc @@ -199,6 +199,7 @@ EThread::execute_regular() ink_hrtime next_time = 0; // give priority to immediate events + has_event_loop = true; for (;;) { if (unlikely(shutdown_event_system == true)) { return; diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc index 6dd2f870574..4e33456ef53 100644 --- a/iocore/hostdb/HostDB.cc +++ b/iocore/hostdb/HostDB.cc @@ -1728,7 +1728,8 @@ int HostDBContinuation::set_check_pending_dns() { Queue &q = hostDB.pending_dns_for_hash(md5.hash); - HostDBContinuation *c = q.head; + this->setThreadAffinity(this_ethread()); + HostDBContinuation *c = q.head; for (; c; c = (HostDBContinuation *)c->link.next) { if (md5.hash == c->md5.hash) { Debug("hostdb", "enqueuing additional request"); @@ -1756,8 +1757,16 @@ HostDBContinuation::remove_trigger_pending_dns() } c = n; } - while ((c = qq.dequeue())) - c->handleEvent(EVENT_IMMEDIATE, nullptr); + EThread *thread = this_ethread(); + while ((c = qq.dequeue())) { + // resume all queued HostDBCont in the thread associated with the netvc to avoid nethandler locking issues. + EThread *affinity_thread = c->getThreadAffinity(); + if (!affinity_thread || affinity_thread == thread) { + c->handleEvent(EVENT_IMMEDIATE, nullptr); + } else { + eventProcessor.schedule_imm(c); + } + } } // diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index 2981ab1f691..06cf4ab6f13 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -1543,24 +1543,40 @@ UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t) void UnixNetVConnection::add_to_keep_alive_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue."); + } nh->add_to_keep_alive_queue(this); } void UnixNetVConnection::remove_from_keep_alive_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue."); + } nh->remove_from_keep_alive_queue(this); } bool UnixNetVConnection::add_to_active_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue."); + } return nh->add_to_active_queue(this); } void UnixNetVConnection::remove_from_active_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue."); + } nh->remove_from_active_queue(this); }