Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions iocore/cache/CacheWrite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Vol::aggWriteDone(int event, Event *e)
CacheVC *c = nullptr;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
} else {
sync.push(c); // put it back on the front
break;
Expand Down Expand Up @@ -1019,11 +1019,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
ink_assert(false);
while ((c = agg.dequeue())) {
agg_todo_size -= c->agg_len;
if (c->initial_thread != nullptr) {
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
} else {
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
}
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
Expand Down Expand Up @@ -1086,8 +1082,6 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
while ((c = tocall.dequeue())) {
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
ret = EVENT_RETURN;
} else if (c->initial_thread != nullptr) {
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
} else {
eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
}
Expand Down
3 changes: 1 addition & 2 deletions iocore/cache/P_CacheInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ struct CacheVC : public CacheVConnection {
// NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
// size_to_init initialization
VIO vio;
EThread *initial_thread; // initial thread open_XX was called on
CacheFragType frag_type;
CacheHTTPInfo *info;
CacheHTTPInfoVector *write_vector;
Expand Down Expand Up @@ -549,9 +548,9 @@ new_CacheVC(Continuation *cont)
CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
c->vector.data.data = &c->vector.data.fast_data[0];
c->_action = cont;
c->initial_thread = t->tt == DEDICATED ? nullptr : t;
c->mutex = cont->mutex;
c->start_time = Thread::get_hrtime();
c->setThreadAffinity(t);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that means CacheVC can be bond on a dedicated thread?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. Is there any potential risk here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A CacheVC instance should be bound to a specific thread, the same thread to which the transaction is bound. The point here is when a transaction does a cache I/O operation, the actual work is done in an AIO thread. When that completes, control should pass back to the exact thread that originally asked for the I/O. This isn't new, it's only a different mechanism. Previously this was a special case in the cache (see CacheVC::initial_thread in "P_CacheInternal.h"), but now it can be done using the general thread affinity. This is in fact one of the use cases that motivated creating the generalized thread affinity.

ink_assert(c->trigger == nullptr);
Debug("cache_new", "new %p", c);
#ifdef CACHE_STAT_PAGES
Expand Down
12 changes: 10 additions & 2 deletions iocore/hostdb/HostDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,8 @@ int
HostDBContinuation::set_check_pending_dns()
{
Queue<HostDBContinuation> &q = hostDB.pending_dns_for_hash(hash.hash);
HostDBContinuation *c = q.head;
this->setThreadAffinity(this_ethread());
HostDBContinuation *c = q.head;
for (; c; c = (HostDBContinuation *)c->link.next) {
if (hash.hash == c->hash.hash) {
Debug("hostdb", "enqueuing additional request");
Expand Down Expand Up @@ -1606,8 +1607,15 @@ HostDBContinuation::remove_trigger_pending_dns()
}
c = n;
}
EThread *thread = this_ethread();
while ((c = qq.dequeue())) {
c->handleEvent(EVENT_IMMEDIATE, nullptr);
// resume all queued HostDBCont in the thread associated with the netvc to avoid nethandler locking issues.
EThread *affinity_thread = c->getThreadAffinity();
if (!affinity_thread || affinity_thread == thread) {
c->handleEvent(EVENT_IMMEDIATE, nullptr);
} else {
eventProcessor.schedule_imm(c);
}
}
}

Expand Down