diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index dd7662095097..8e59c59ab30e 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -83,7 +83,8 @@ void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) { template bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { - VersionedVarBlock *old_pending_write, *end_of_dispatch_chain; + VersionedVarBlock *old_pending_write, *end_of_read_chain; + bool trigger_write = false; { // this is lock scope std::lock_guard lock{m_}; @@ -91,32 +92,31 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { // detach pending write old_pending_write = pending_write_; // search for chains to trigger - VersionedVarBlock *p = old_pending_write->next; + end_of_read_chain = old_pending_write->next; assert(num_pending_reads_ == 0); - while (p->next != nullptr && p->write == false) { + while (end_of_read_chain->next != nullptr && + end_of_read_chain->write == false) { ++num_pending_reads_; - p = p->next; + end_of_read_chain = end_of_read_chain->next; } - // mark end of dispatch chain - end_of_dispatch_chain = p; - - if (p->next == nullptr) { + // check the states + if (end_of_read_chain->next == nullptr) { ready_to_read_ = true; pending_write_ = nullptr; - assert(p->trigger == nullptr); - assert(p->write ==false); } else { - assert(p->write == true); - pending_write_ = p; + assert(end_of_read_chain->write == true); + pending_write_ = end_of_read_chain; if (num_pending_reads_ == 0) { - if (--pending_write_->trigger->wait == 0) { - dispatcher(pending_write_->trigger); - } + trigger_write = true; } } } - // this is outside of lock scope - // the linked list is detached from variable + // This is outside of lock scope + // Be very carful, pending_write_ and num_pending_reads_ + // can change now, do not reply ont the two variables. + // The linked list \in [old_pending_write, end_of_read_chain) + // is already detached from this Var. + // So it is safe to modify these VersionedVarBlock *cur_head = old_pending_write->next; VersionedVarBlock::Delete(old_pending_write); if (to_delete_) { @@ -125,7 +125,7 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { return true; } // dispatch all the events - while (cur_head != end_of_dispatch_chain) { + while (cur_head != end_of_read_chain) { if (--cur_head->trigger->wait == 0) { dispatcher(cur_head->trigger); } @@ -134,6 +134,13 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) { assert(cur_head != nullptr); VersionedVarBlock::Delete(prev); } + // Be careful, do not use pending_write_ or num_pending_reads_ here. + // As they can change, use end_of_read_chain + if (trigger_write) { + if (--end_of_read_chain->trigger->wait == 0) { + dispatcher(end_of_read_chain->trigger); + } + } return false; }