diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index 178ec08ddb7..48d16cb1f2b 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -358,7 +358,7 @@ Vol::aggWriteDone(int event, Event *e) CacheVC *c = nullptr; while ((c = sync.dequeue())) { if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) { - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); } else { sync.push(c); // put it back on the front break; @@ -1019,11 +1019,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) ink_assert(false); while ((c = agg.dequeue())) { agg_todo_size -= c->agg_len; - if (c->initial_thread != nullptr) { - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); - } else { - eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); - } + eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } @@ -1086,8 +1082,6 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) while ((c = tocall.dequeue())) { if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) { ret = EVENT_RETURN; - } else if (c->initial_thread != nullptr) { - c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE); } else { eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE); } diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h index d82aa347324..8ed84757789 100644 --- a/iocore/cache/P_CacheInternal.h +++ b/iocore/cache/P_CacheInternal.h @@ -440,7 +440,6 @@ struct CacheVC : public CacheVConnection { // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the // size_to_init initialization VIO vio; - EThread *initial_thread; // initial thread open_XX was called on CacheFragType frag_type; CacheHTTPInfo *info; CacheHTTPInfoVector *write_vector; @@ -549,9 +548,9 @@ new_CacheVC(Continuation *cont) CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t); c->vector.data.data = &c->vector.data.fast_data[0]; c->_action = cont; - c->initial_thread = t->tt == DEDICATED ? nullptr : t; c->mutex = cont->mutex; c->start_time = Thread::get_hrtime(); + c->setThreadAffinity(t); ink_assert(c->trigger == nullptr); Debug("cache_new", "new %p", c); #ifdef CACHE_STAT_PAGES diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc index 73c80141c8d..ed7ba5faeea 100644 --- a/iocore/hostdb/HostDB.cc +++ b/iocore/hostdb/HostDB.cc @@ -1578,7 +1578,8 @@ int HostDBContinuation::set_check_pending_dns() { Queue &q = hostDB.pending_dns_for_hash(hash.hash); - HostDBContinuation *c = q.head; + this->setThreadAffinity(this_ethread()); + HostDBContinuation *c = q.head; for (; c; c = (HostDBContinuation *)c->link.next) { if (hash.hash == c->hash.hash) { Debug("hostdb", "enqueuing additional request"); @@ -1606,8 +1607,15 @@ HostDBContinuation::remove_trigger_pending_dns() } c = n; } + EThread *thread = this_ethread(); while ((c = qq.dequeue())) { - c->handleEvent(EVENT_IMMEDIATE, nullptr); + // resume all queued HostDBCont in the thread associated with the netvc to avoid nethandler locking issues. + EThread *affinity_thread = c->getThreadAffinity(); + if (!affinity_thread || affinity_thread == thread) { + c->handleEvent(EVENT_IMMEDIATE, nullptr); + } else { + eventProcessor.schedule_imm(c); + } } }