diff --git a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h index fd471f22b981a..91f015fae80db 100644 --- a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h +++ b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h @@ -54,6 +54,7 @@ class FullHistoryMerger : public framework::Task MergerConfig mConfig; std::unique_ptr mCollector; + int mCyclesSinceReset = 0; // stats int mTotalObjectsMerged = 0; @@ -65,6 +66,7 @@ class FullHistoryMerger : public framework::Task void updateCache(const framework::DataRef& ref); void mergeCache(); void publish(framework::DataAllocator& allocator); + void clear(); }; } // namespace o2::mergers diff --git a/Utilities/Mergers/include/Mergers/IntegratingMerger.h b/Utilities/Mergers/include/Mergers/IntegratingMerger.h index e219b780f3ff6..2ec28394435a3 100644 --- a/Utilities/Mergers/include/Mergers/IntegratingMerger.h +++ b/Utilities/Mergers/include/Mergers/IntegratingMerger.h @@ -52,12 +52,14 @@ class IntegratingMerger : public framework::Task private: void publish(framework::DataAllocator& allocator); + void clear(); private: header::DataHeader::SubSpecificationType mSubSpec; ObjectStore mMergedObject = std::monostate{}; MergerConfig mConfig; std::unique_ptr mCollector; + int mCyclesSinceReset = 0; // stats int mTotalDeltasMerged = 0; diff --git a/Utilities/Mergers/include/Mergers/MergerConfig.h b/Utilities/Mergers/include/Mergers/MergerConfig.h index 48d57d11a4ca4..eb56b510a6c6e 100644 --- a/Utilities/Mergers/include/Mergers/MergerConfig.h +++ b/Utilities/Mergers/include/Mergers/MergerConfig.h @@ -34,8 +34,11 @@ enum class MergedObjectTimespan { FullHistory, // Merged object should be an sum of differences received after last publication. // Merged object is reset after published. It won't produce meaningful results - // when InputObjectsTimespan::FullHstory is set. - LastDifference + // when InputObjectsTimespan::FullHistory is set. + LastDifference, + // Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite). + // The the above will be removed once we switch to NCycles in QC. + NCycles }; enum class PublicationDecision { @@ -56,7 +59,7 @@ struct ConfigEntry { // \brief MergerAlgorithm configuration structure. Default configuration should work in most cases, out of the box. struct MergerConfig { ConfigEntry inputObjectTimespan = {InputObjectsTimespan::FullHistory}; - ConfigEntry mergedObjectTimespan = {MergedObjectTimespan::FullHistory}; + ConfigEntry mergedObjectTimespan = {MergedObjectTimespan::FullHistory}; ConfigEntry publicationDecision = {PublicationDecision::EachNSeconds, 10}; ConfigEntry topologySize = {TopologySize::NumberOfLayers, 1}; std::string monitoringUrl = "infologger:///debug?qc"; diff --git a/Utilities/Mergers/src/FullHistoryMerger.cxx b/Utilities/Mergers/src/FullHistoryMerger.cxx index 1379ed2d76f04..b4c3c4c4d3cda 100644 --- a/Utilities/Mergers/src/FullHistoryMerger.cxx +++ b/Utilities/Mergers/src/FullHistoryMerger.cxx @@ -45,6 +45,7 @@ FullHistoryMerger::~FullHistoryMerger() void FullHistoryMerger::init(framework::InitContext& ictx) { + mCyclesSinceReset = 0; mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl); mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers); } @@ -62,11 +63,36 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) { + mCyclesSinceReset++; mergeCache(); publish(ctx.outputs()); + + if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || + mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { + clear(); + } } } +// I am not calling it reset(), because it does not have to be performed during the FairMQs reset. +void FullHistoryMerger::clear() +{ + mFirstObjectSerialized.first.clear(); + delete mFirstObjectSerialized.second.header; + delete mFirstObjectSerialized.second.payload; + delete mFirstObjectSerialized.second.spec; + mFirstObjectSerialized.second.header = nullptr; + mFirstObjectSerialized.second.payload = nullptr; + mFirstObjectSerialized.second.spec = nullptr; + mMergedObject = std::monostate{}; + mCache.clear(); + mCyclesSinceReset = 0; + mTotalObjectsMerged = 0; + mObjectsMerged = 0; + mTotalUpdatesReceived = 0; + mUpdatesReceived = 0; +} + void FullHistoryMerger::updateCache(const DataRef& ref) { auto* dh = get(ref.header); @@ -78,7 +104,6 @@ void FullHistoryMerger::updateCache(const DataRef& ref) // We store one object in the serialized form, so we can take it as the first object to be merged (multiple times). // If we kept it deserialized, we would need to require implementing a clone() method in MergeInterface. - // Clang-Tidy: 'if' statement is unnecessary; deleting null pointer has no effect delete mFirstObjectSerialized.second.spec; delete mFirstObjectSerialized.second.header; delete mFirstObjectSerialized.second.payload; @@ -129,7 +154,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator) { // todo see if std::visit is faster here if (std::holds_alternative(mMergedObject)) { - LOG(INFO) << "Nothing to publish yet"; + LOG(INFO) << "No objects received since start or reset, nothing to publish"; } else if (std::holds_alternative(mMergedObject)) { allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec}, *std::get(mMergedObject)); @@ -150,6 +175,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator) mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"}); mCollector->send({mTotalUpdatesReceived, "total_updates_received"}, monitoring::DerivedMetricMode::RATE); mCollector->send({mUpdatesReceived, "updates_received_since_last_publication"}); + mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); mObjectsMerged = 0; mUpdatesReceived = 0; } diff --git a/Utilities/Mergers/src/IntegratingMerger.cxx b/Utilities/Mergers/src/IntegratingMerger.cxx index 433164bbb9554..225018e96f0ea 100644 --- a/Utilities/Mergers/src/IntegratingMerger.cxx +++ b/Utilities/Mergers/src/IntegratingMerger.cxx @@ -36,6 +36,7 @@ IntegratingMerger::IntegratingMerger(const MergerConfig& config, const header::D void IntegratingMerger::init(framework::InitContext& ictx) { + mCyclesSinceReset = 0; mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl); mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers); } @@ -68,21 +69,31 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx) } if (ctx.inputs().isValid("timer-publish")) { - + mCyclesSinceReset++; publish(ctx.outputs()); - if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference) { - mMergedObject = std::monostate{}; + if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || + mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { + clear(); } } } +// I am not calling it reset(), because it does not have to be performed during the FairMQs reset. +void IntegratingMerger::clear() +{ + mMergedObject = std::monostate{}; + mCyclesSinceReset = 0; + mTotalDeltasMerged = 0; + mDeltasMerged = 0; +} + void IntegratingMerger::publish(framework::DataAllocator& allocator) { mTotalDeltasMerged += mDeltasMerged; if (std::holds_alternative(mMergedObject)) { - LOG(INFO) << "Nothing to publish yet"; + LOG(INFO) << "No objects received since start or reset, nothing to publish"; } else if (std::holds_alternative(mMergedObject)) { allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec}, *std::get(mMergedObject)); @@ -99,6 +110,7 @@ void IntegratingMerger::publish(framework::DataAllocator& allocator) mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE); mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"}); + mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); mDeltasMerged = 0; } diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 17301b5b68658..df2eac4475458 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -104,9 +104,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() size_t inputsPerMergerRemainder = layerInputs.size() % numberOfMergers; MergerConfig layerConfig = mConfig; - if (layer > 1 && mConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) { - layerConfig.inputObjectTimespan = {InputObjectsTimespan::FullHistory}; // in LastDifference mode only the first layer should integrate - layerConfig.mergedObjectTimespan = {MergedObjectTimespan::LastDifference}; // and objects that are merged should not be used again + if (layer < mergersPerLayer.size() - 1) { + // in intermediate layers we should reset the results, so the same data is not added many times. + layerConfig.mergedObjectTimespan = {MergedObjectTimespan::NCycles, 1}; } mergerBuilder.setConfig(layerConfig);