Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -342,17 +342,36 @@ void on_signal_callback(uv_signal_t* handle, int signum)
context->stats->totalSigusr1 += 1;
}

/// Invoke the callbacks for the mPendingRegionInfos
void handleRegionCallbacks(ServiceRegistry& registry, std::vector<FairMQRegionInfo>& infos)
{
if (infos.empty() == false) {
std::vector<FairMQRegionInfo> toBeNotified;
toBeNotified.swap(infos); // avoid any MT issue.
for (auto const& info : toBeNotified) {
registry.get<CallbackService>()(CallbackService::Id::RegionInfoCallback, info);
}
}
}

void DataProcessingDevice::InitTask()
{
for (auto& channel : fChannels) {
channel.second.at(0).Transport()->SubscribeToRegionEvents([&pendingRegionInfos = mPendingRegionInfos, &regionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) {
channel.second.at(0).Transport()->SubscribeToRegionEvents([this,
&registry = mServiceRegistry,
&pendingRegionInfos = mPendingRegionInfos,
&regionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) {
std::lock_guard<std::mutex> lock(regionInfoMutex);
LOG(debug) << ">>> Region info event" << info.event;
LOG(debug) << "id: " << info.id;
LOG(debug) << "ptr: " << info.ptr;
LOG(debug) << "size: " << info.size;
LOG(debug) << "flags: " << info.flags;
pendingRegionInfos.push_back(info);
// When not running we can handle the callbacks synchronously.
if (this->GetCurrentState() != fair::mq::State::Running) {
handleRegionCallbacks(registry, pendingRegionInfos);
}
});
}

Expand Down Expand Up @@ -497,6 +516,12 @@ void DataProcessingDevice::Reset() { mServiceRegistry.get<CallbackService>()(Cal

bool DataProcessingDevice::ConditionalRun()
{
// Notify on the main thread the new region callbacks, making sure
// no callback is issued if there is something still processing.
{
std::lock_guard<std::mutex> lock(mRegionInfoMutex);
handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
}
// This will block for the correct delay (or until we get data
// on a socket). We also do not block on the first iteration
// so that devices which do not have a timer can still start an
Expand Down Expand Up @@ -528,15 +553,13 @@ bool DataProcessingDevice::ConditionalRun()

// Notify on the main thread the new region callbacks, making sure
// no callback is issued if there is something still processing.
// Notice that we still need to perform callbacks also after
// the socket epolled, because otherwise we would end up serving
// the callback after the first data arrives is the system is too
// fast to transition from Init to Run.
{
std::lock_guard<std::mutex> lock(mRegionInfoMutex);
if (mPendingRegionInfos.empty() == false) {
std::vector<FairMQRegionInfo> toBeNotified;
toBeNotified.swap(mPendingRegionInfos); // avoid any MT issue.
for (auto const& info : toBeNotified) {
mServiceRegistry.get<CallbackService>()(CallbackService::Id::RegionInfoCallback, info);
}
}
handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
}

assert(mStreams.size() == mHandles.size());
Expand Down