Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions iocore/cache/CacheWrite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ Vol::aggWriteDone(int event, Event *e)
// callback ready sync CacheVCs
CacheVC *c = nullptr;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
else {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
} else {
sync.push(c); // put it back on the front
break;
}
Expand Down Expand Up @@ -987,10 +987,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
ink_assert(false);
while ((c = agg.dequeue())) {
agg_todo_size -= c->agg_len;
if (c->initial_thread != nullptr)
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
else
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
Expand Down Expand Up @@ -1047,12 +1044,11 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
Lwait:
int ret = EVENT_CONT;
while ((c = tocall.dequeue())) {
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
ret = EVENT_RETURN;
else if (c->initial_thread != nullptr)
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
else
} else {
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
}
}
return ret;
}
Expand Down
3 changes: 1 addition & 2 deletions iocore/cache/P_CacheInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ struct CacheVC : public CacheVConnection {
// NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
// size_to_init initialization
VIO vio;
EThread *initial_thread; // initial thread open_XX was called on
CacheFragType frag_type;
CacheHTTPInfo *info;
CacheHTTPInfoVector *write_vector;
Expand Down Expand Up @@ -566,9 +565,9 @@ new_CacheVC(Continuation *cont)
c->vector.data.data = &c->vector.data.fast_data[0];
#endif
c->_action = cont;
c->initial_thread = t->tt == DEDICATED ? nullptr : t;
c->mutex = cont->mutex;
c->start_time = Thread::get_hrtime();
c->setThreadAffinity(t);
ink_assert(c->trigger == nullptr);
Debug("cache_new", "new %p", c);
#ifdef CACHE_STAT_PAGES
Expand Down
27 changes: 27 additions & 0 deletions iocore/eventsystem/I_Continuation.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class Processor;
class ProxyMutex;
class EThread;

extern EThread *this_ethread();
extern EThread *this_event_thread();

//////////////////////////////////////////////////////////////////////////////
//
// Constants and Type Definitions
Expand Down Expand Up @@ -133,6 +136,30 @@ class Continuation : private force_VFPT_to_top
*/
ContFlags control_flags;

EThread *thread_affinity = this_event_thread();

bool
setThreadAffinity(EThread *ethread)
{
if (ethread != nullptr) {
thread_affinity = ethread;
return true;
}
return false;
}

EThread *
getThreadAffinity()
{
return thread_affinity;
}

void
clearThreadAffinity()
{
thread_affinity = nullptr;
}

/**
Receives the event code and data for an Event.

Expand Down
2 changes: 2 additions & 0 deletions iocore/eventsystem/I_EThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ class EThread : public Thread
bool is_event_type(EventType et);
void set_event_type(EventType et);

bool has_event_loop = false;

// Private Interface

void execute();
Expand Down
11 changes: 11 additions & 0 deletions iocore/eventsystem/P_UnixEThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ this_ethread()
return (EThread *)this_thread();
}

TS_INLINE EThread *
this_event_thread()
{
EThread *ethread = this_ethread();
if (ethread != nullptr && ethread->has_event_loop) {
return ethread;
} else {
return nullptr;
}
}

TS_INLINE void
EThread::free_event(Event *e)
{
Expand Down
9 changes: 8 additions & 1 deletion iocore/eventsystem/P_UnixEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ TS_INLINE Event *
EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
{
ink_assert(etype < MAX_EVENT_TYPES);
e->ethread = assign_thread(etype);

EThread *ethread = e->continuation->getThreadAffinity();
if (ethread != nullptr && ethread->is_event_type(etype)) {
e->ethread = ethread;
} else {
e->ethread = assign_thread(etype);
}

if (e->continuation->mutex)
e->mutex = e->continuation->mutex;
else
Expand Down
1 change: 1 addition & 0 deletions iocore/eventsystem/UnixEThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ EThread::execute_regular()
ink_hrtime next_time = 0;

// give priority to immediate events
has_event_loop = true;
for (;;) {
if (unlikely(shutdown_event_system == true)) {
return;
Expand Down
15 changes: 12 additions & 3 deletions iocore/hostdb/HostDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1728,7 +1728,8 @@ int
HostDBContinuation::set_check_pending_dns()
{
Queue<HostDBContinuation> &q = hostDB.pending_dns_for_hash(md5.hash);
HostDBContinuation *c = q.head;
this->setThreadAffinity(this_ethread());
HostDBContinuation *c = q.head;
for (; c; c = (HostDBContinuation *)c->link.next) {
if (md5.hash == c->md5.hash) {
Debug("hostdb", "enqueuing additional request");
Expand Down Expand Up @@ -1756,8 +1757,16 @@ HostDBContinuation::remove_trigger_pending_dns()
}
c = n;
}
while ((c = qq.dequeue()))
c->handleEvent(EVENT_IMMEDIATE, nullptr);
EThread *thread = this_ethread();
while ((c = qq.dequeue())) {
// resume all queued HostDBCont in the thread associated with the netvc to avoid nethandler locking issues.
EThread *affinity_thread = c->getThreadAffinity();
if (!affinity_thread || affinity_thread == thread) {
c->handleEvent(EVENT_IMMEDIATE, nullptr);
} else {
eventProcessor.schedule_imm(c);
}
}
}

//
Expand Down
16 changes: 16 additions & 0 deletions iocore/net/UnixNetVConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,24 +1543,40 @@ UnixNetVConnection::migrateToCurrentThread(Continuation *cont, EThread *t)
void
UnixNetVConnection::add_to_keep_alive_queue()
{
MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
if (!lock.is_locked()) {
Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
}
nh->add_to_keep_alive_queue(this);
}

void
UnixNetVConnection::remove_from_keep_alive_queue()
{
MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
if (!lock.is_locked()) {
Error("BUG: It must have acquired the NetHandler's lock before doing anything on keep_alive_queue.");
}
nh->remove_from_keep_alive_queue(this);
}

bool
UnixNetVConnection::add_to_active_queue()
{
MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
if (!lock.is_locked()) {
Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
}
return nh->add_to_active_queue(this);
}

void
UnixNetVConnection::remove_from_active_queue()
{
MUTEX_TRY_LOCK(lock, nh->mutex, this_ethread());
if (!lock.is_locked()) {
Error("BUG: It must have acquired the NetHandler's lock before doing anything on active_queue.");
}
nh->remove_from_active_queue(this);
}

Expand Down