From 8fa3624285126c2a20a27d04a7ccce99e9d65c9c Mon Sep 17 00:00:00 2001 From: Susan Hinrichs Date: Tue, 5 Mar 2019 17:51:31 +0000 Subject: [PATCH 1/3] back-porting https://github.com/apache/trafficserver/pull/5120 --- iocore/cache/CacheWrite.cc | 16 ++++++---------- iocore/cache/P_CacheInternal.h | 3 +-- iocore/hostdb/HostDB.cc | 15 ++++++++++++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index 0045f107711..cef04d72c04 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -345,9 +345,9 @@ Vol::aggWriteDone(int event, Event *e) // callback ready sync CacheVCs CacheVC *c = nullptr; while ((c = sync.dequeue())) { - if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else { + if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) { + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + } else { sync.push(c); // put it back on the front break; } @@ -987,10 +987,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) ink_assert(false); while ((c = agg.dequeue())) { agg_todo_size -= c->agg_len; - if (c->initial_thread != nullptr) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } @@ -1049,10 +1046,9 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) while ((c = tocall.dequeue())) { if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) ret = EVENT_RETURN; - else if (c->initial_thread != nullptr) - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - else + } else { eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); + } } return ret; } diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h index e70460b107b..ce2565c9f81 100644 --- a/iocore/cache/P_CacheInternal.h +++ b/iocore/cache/P_CacheInternal.h @@ -449,7 +449,6 @@ struct CacheVC : public CacheVConnection { // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the // size_to_init initialization VIO vio; - EThread *initial_thread; // initial thread open_XX was called on CacheFragType frag_type; CacheHTTPInfo *info; CacheHTTPInfoVector *write_vector; @@ -566,9 +565,9 @@ new_CacheVC(Continuation *cont) c->vector.data.data = &c->vector.data.fast_data[0]; #endif c->_action = cont; - c->initial_thread = t->tt == DEDICATED ? nullptr : t; c->mutex = cont->mutex; c->start_time = Thread::get_hrtime(); + c->setThreadAffinity(t); ink_assert(c->trigger == nullptr); Debug("cache_new", "new %p", c); #ifdef CACHE_STAT_PAGES diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc index 6dd2f870574..4e33456ef53 100644 --- a/iocore/hostdb/HostDB.cc +++ b/iocore/hostdb/HostDB.cc @@ -1728,7 +1728,8 @@ int HostDBContinuation::set_check_pending_dns() { Queue &q = hostDB.pending_dns_for_hash(md5.hash); - HostDBContinuation *c = q.head; + this->setThreadAffinity(this_ethread()); + HostDBContinuation *c = q.head; for (; c; c = (HostDBContinuation *)c->link.next) { if (md5.hash == c->md5.hash) { Debug("hostdb", "enqueuing additional request"); @@ -1756,8 +1757,16 @@ HostDBContinuation::remove_trigger_pending_dns() } c = n; } - while ((c = qq.dequeue())) - c->handleEvent(EVENT_IMMEDIATE, nullptr); + EThread *thread = this_ethread(); + while ((c = qq.dequeue())) { + // resume all queued HostDBCont in the thread associated with the netvc to avoid nethandler locking issues. + EThread *affinity_thread = c->getThreadAffinity(); + if (!affinity_thread || affinity_thread == thread) { + c->handleEvent(EVENT_IMMEDIATE, nullptr); + } else { + eventProcessor.schedule_imm(c); + } + } } // From bfb0ca72bcecf45088314f3b4b3d0871c8451d0f Mon Sep 17 00:00:00 2001 From: "Daniel Morilha (netlify)" Date: Mon, 11 Mar 2019 12:24:30 -0700 Subject: [PATCH 2/3] back-porting Continuation::setThreadAffinity --- iocore/cache/CacheWrite.cc | 2 +- iocore/eventsystem/I_Continuation.h | 27 +++++++++++++++++++++++ iocore/eventsystem/I_EThread.h | 2 ++ iocore/eventsystem/P_UnixEThread.h | 11 +++++++++ iocore/eventsystem/P_UnixEventProcessor.h | 9 +++++++- iocore/eventsystem/UnixEThread.cc | 1 + 6 files changed, 50 insertions(+), 2 deletions(-) diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index cef04d72c04..ae0003ec309 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -1044,7 +1044,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) Lwait: int ret = EVENT_CONT; while ((c = tocall.dequeue())) { - if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) + if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) { ret = EVENT_RETURN; } else { eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); diff --git a/iocore/eventsystem/I_Continuation.h b/iocore/eventsystem/I_Continuation.h index 496fbe46ebe..b9f0cc81517 100644 --- a/iocore/eventsystem/I_Continuation.h +++ b/iocore/eventsystem/I_Continuation.h @@ -46,6 +46,9 @@ class Processor; class ProxyMutex; class EThread; +extern EThread *this_ethread(); +extern EThread *this_event_thread(); + ////////////////////////////////////////////////////////////////////////////// // // Constants and Type Definitions @@ -133,6 +136,30 @@ class Continuation : private force_VFPT_to_top */ ContFlags control_flags; + EThread *thread_affinity = this_event_thread(); + + bool + setThreadAffinity(EThread *ethread) + { + if (ethread != nullptr) { + thread_affinity = ethread; + return true; + } + return false; + } + + EThread * + getThreadAffinity() + { + return thread_affinity; + } + + void + clearThreadAffinity() + { + thread_affinity = nullptr; + } + /** Receives the event code and data for an Event. diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h index 1c46fa2f920..4facf36acba 100644 --- a/iocore/eventsystem/I_EThread.h +++ b/iocore/eventsystem/I_EThread.h @@ -329,6 +329,8 @@ class EThread : public Thread bool is_event_type(EventType et); void set_event_type(EventType et); + bool has_event_loop = false; + // Private Interface void execute(); diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h index 45551a1a87b..de32f6b69f3 100644 --- a/iocore/eventsystem/P_UnixEThread.h +++ b/iocore/eventsystem/P_UnixEThread.h @@ -169,6 +169,17 @@ this_ethread() return (EThread *)this_thread(); } +TS_INLINE EThread * +this_event_thread() +{ + EThread *ethread = this_ethread(); + if (ethread != nullptr && ethread->has_event_loop) { + return ethread; + } else { + return nullptr; + } +} + TS_INLINE void EThread::free_event(Event *e) { diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h index 8b9a04aa81f..73c0f0736ed 100644 --- a/iocore/eventsystem/P_UnixEventProcessor.h +++ b/iocore/eventsystem/P_UnixEventProcessor.h @@ -71,7 +71,14 @@ TS_INLINE Event * EventProcessor::schedule(Event *e, EventType etype, bool fast_signal) { ink_assert(etype < MAX_EVENT_TYPES); - e->ethread = assign_thread(etype); + + EThread *ethread = e->continuation->getThreadAffinity(); + if (ethread != nullptr && ethread->is_event_type(etype)) { + e->ethread = ethread; + } else { + e->ethread = assign_thread(etype); + } + if (e->continuation->mutex) e->mutex = e->continuation->mutex; else diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc index 1f2496d9beb..b0b79dc5487 100644 --- a/iocore/eventsystem/UnixEThread.cc +++ b/iocore/eventsystem/UnixEThread.cc @@ -199,6 +199,7 @@ EThread::execute_regular() ink_hrtime next_time = 0; // give priority to immediate events + has_event_loop = true; for (;;) { if (unlikely(shutdown_event_system == true)) { return; From 205b5b618fe03ea652503aa2d27c422be51e4eb4 Mon Sep 17 00:00:00 2001 From: "Daniel Morilha (netlify)" Date: Wed, 13 Mar 2019 10:52:18 -0700 Subject: [PATCH 3/3] moving from asserts to log errors --- iocore/net/UnixNetVConnection.cc | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc index 2981ab1f691..06cf4ab6f13 100644 --- a/iocore/net/UnixNetVConnection.cc +++ b/iocore/net/UnixNetVConnection.cc @@ -1543,24 +1543,40 @@ UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t) void UnixNetVConnection::add_to_keep_alive_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue."); + } nh->add_to_keep_alive_queue(this); } void UnixNetVConnection::remove_from_keep_alive_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue."); + } nh->remove_from_keep_alive_queue(this); } bool UnixNetVConnection::add_to_active_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue."); + } return nh->add_to_active_queue(this); } void UnixNetVConnection::remove_from_active_queue() { + MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread()); + if (!lock.is_locked()) { + Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue."); + } nh->remove_from_active_queue(this); }