From 1312191e5110b2a72d9482a68d437f7ea8b3189a Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Fri, 9 Jul 2021 15:06:34 +0200 Subject: [PATCH] DPL: force processing of region callback without waiting for data Without this, it possible that a late arriving event will not be processed until some data actually arrives on one of the channels, unblocking the event loop. This should obviate to the problem by triggering an aync libuv event which will unblock the event loop without the need for data. --- Framework/Core/include/Framework/DeviceState.h | 4 ++++ Framework/Core/src/DataProcessingDevice.cxx | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 7e9ac3772f426..b87eae6e620c3 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -23,6 +23,7 @@ typedef struct uv_loop_s uv_loop_t; typedef struct uv_timer_s uv_timer_t; typedef struct uv_poll_s uv_poll_t; typedef struct uv_signal_s uv_signal_t; +typedef struct uv_async_s uv_async_t; namespace o2::framework { @@ -78,6 +79,9 @@ struct DeviceState { std::vector activeOutputPollers; /// The list of active signal handlers std::vector activeSignals; + + uv_async_t* awakeMainThread = nullptr; + int loopReason = 0; }; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f3dea6416d523..b8c3d1ac8c60d 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -355,8 +355,22 @@ void handleRegionCallbacks(ServiceRegistry& registry, std::vectordata; + state->loopReason |= DeviceState::ASYNC_NOTIFICATION; +} +} // namespace void DataProcessingDevice::InitTask() { + if (mState.awakeMainThread == nullptr) { + mState.awakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t)); + mState.awakeMainThread->data = &mState; + uv_async_init(mState.loop, mState.awakeMainThread, on_awake_main_thread); + } + for (auto& channel : fChannels) { channel.second.at(0).Transport()->SubscribeToRegionEvents([this, ®istry = mServiceRegistry, @@ -372,6 +386,8 @@ void DataProcessingDevice::InitTask() // When not running we can handle the callbacks synchronously. if (this->GetCurrentState() != fair::mq::State::Running) { handleRegionCallbacks(registry, pendingRegionInfos); + } else { + uv_async_send(registry.get().awakeMainThread); } }); }