From 0de7a9a47388fba8e36ae4816d4ca695f5a11f29 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 30 Jun 2021 12:14:56 +0200 Subject: [PATCH 1/2] DPL: process region callbacks as early as possible In case we are not running, we can afford processing the callbacks as soon as possible. --- Framework/Core/src/DataProcessingDevice.cxx | 40 +++++++++++++-------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index d9939ab246c9b..fbd75489b4a43 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -342,10 +342,25 @@ void on_signal_callback(uv_signal_t* handle, int signum) context->stats->totalSigusr1 += 1; } +/// Invoke the callbacks for the mPendingRegionInfos +void handleRegionCallbacks(ServiceRegistry& registry, std::vector& infos) +{ + if (infos.empty() == false) { + std::vector toBeNotified; + toBeNotified.swap(infos); // avoid any MT issue. + for (auto const& info : toBeNotified) { + registry.get()(CallbackService::Id::RegionInfoCallback, info); + } + } +} + void DataProcessingDevice::InitTask() { for (auto& channel : fChannels) { - channel.second.at(0).Transport()->SubscribeToRegionEvents([&pendingRegionInfos = mPendingRegionInfos, ®ionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) { + channel.second.at(0).Transport()->SubscribeToRegionEvents([this, + ®istry = mServiceRegistry, + &pendingRegionInfos = mPendingRegionInfos, + ®ionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) { std::lock_guard lock(regionInfoMutex); LOG(debug) << ">>> Region info event" << info.event; LOG(debug) << "id: " << info.id; @@ -353,6 +368,10 @@ void DataProcessingDevice::InitTask() LOG(debug) << "size: " << info.size; LOG(debug) << "flags: " << info.flags; pendingRegionInfos.push_back(info); + // When not running we can handle the callbacks synchronously. + if (this->GetCurrentState() != fair::mq::State::Running) { + handleRegionCallbacks(registry, pendingRegionInfos); + } }); } @@ -497,6 +516,12 @@ void DataProcessingDevice::Reset() { mServiceRegistry.get()(Cal bool DataProcessingDevice::ConditionalRun() { + // Notify on the main thread the new region callbacks, making sure + // no callback is issued if there is something still processing. + { + std::lock_guard lock(mRegionInfoMutex); + handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); + } // This will block for the correct delay (or until we get data // on a socket). We also do not block on the first iteration // so that devices which do not have a timer can still start an @@ -526,19 +551,6 @@ bool DataProcessingDevice::ConditionalRun() } } - // Notify on the main thread the new region callbacks, making sure - // no callback is issued if there is something still processing. - { - std::lock_guard lock(mRegionInfoMutex); - if (mPendingRegionInfos.empty() == false) { - std::vector toBeNotified; - toBeNotified.swap(mPendingRegionInfos); // avoid any MT issue. - for (auto const& info : toBeNotified) { - mServiceRegistry.get()(CallbackService::Id::RegionInfoCallback, info); - } - } - } - assert(mStreams.size() == mHandles.size()); /// Decide which task to use TaskStreamRef streamRef{-1}; From 737489e647c61ba91461b5052b7375afba9de1e9 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 8 Jul 2021 22:43:42 +0200 Subject: [PATCH 2/2] Update DataProcessingDevice.cxx --- Framework/Core/src/DataProcessingDevice.cxx | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index fbd75489b4a43..69bd8b83385ad 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -551,6 +551,17 @@ bool DataProcessingDevice::ConditionalRun() } } + // Notify on the main thread the new region callbacks, making sure + // no callback is issued if there is something still processing. + // Notice that we still need to perform callbacks also after + // the socket epolled, because otherwise we would end up serving + // the callback after the first data arrives is the system is too + // fast to transition from Init to Run. + { + std::lock_guard lock(mRegionInfoMutex); + handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); + } + assert(mStreams.size() == mHandles.size()); /// Decide which task to use TaskStreamRef streamRef{-1};