From 5574175b87cefe61323d9a863d23475f98390d59 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 17 Jun 2021 15:05:07 +0200 Subject: [PATCH] [QC-134] Moving window in Mergers --- .../include/Mergers/FullHistoryMerger.h | 2 ++ .../include/Mergers/IntegratingMerger.h | 2 ++ .../Mergers/include/Mergers/MergerConfig.h | 9 ++++-- Utilities/Mergers/src/FullHistoryMerger.cxx | 30 +++++++++++++++++-- Utilities/Mergers/src/IntegratingMerger.cxx | 20 ++++++++++--- .../src/MergerInfrastructureBuilder.cxx | 6 ++-- 6 files changed, 57 insertions(+), 12 deletions(-) diff --git a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h index fd471f22b981a..91f015fae80db 100644 --- a/Utilities/Mergers/include/Mergers/FullHistoryMerger.h +++ b/Utilities/Mergers/include/Mergers/FullHistoryMerger.h @@ -54,6 +54,7 @@ class FullHistoryMerger : public framework::Task MergerConfig mConfig; std::unique_ptr mCollector; + int mCyclesSinceReset = 0; // stats int mTotalObjectsMerged = 0; @@ -65,6 +66,7 @@ class FullHistoryMerger : public framework::Task void updateCache(const framework::DataRef& ref); void mergeCache(); void publish(framework::DataAllocator& allocator); + void clear(); }; } // namespace o2::mergers diff --git a/Utilities/Mergers/include/Mergers/IntegratingMerger.h b/Utilities/Mergers/include/Mergers/IntegratingMerger.h index a7133fa49b56b..2b27a3292aaeb 100644 --- a/Utilities/Mergers/include/Mergers/IntegratingMerger.h +++ b/Utilities/Mergers/include/Mergers/IntegratingMerger.h @@ -52,12 +52,14 @@ class IntegratingMerger : public framework::Task private: void publish(framework::DataAllocator& allocator); + void clear(); private: header::DataHeader::SubSpecificationType mSubSpec; ObjectStore mMergedObject = std::monostate{}; MergerConfig mConfig; std::unique_ptr mCollector; + int mCyclesSinceReset = 0; // stats int mTotalObjectsMerged = 0; diff --git a/Utilities/Mergers/include/Mergers/MergerConfig.h b/Utilities/Mergers/include/Mergers/MergerConfig.h index 2ddd24b1c2993..bdbda00c61001 100644 --- a/Utilities/Mergers/include/Mergers/MergerConfig.h +++ b/Utilities/Mergers/include/Mergers/MergerConfig.h @@ -32,8 +32,11 @@ enum class MergedObjectTimespan { FullHistory, // Merged object should be an sum of differences received after last publication. // Merged object is reset after published. It won't produce meaningful results - // when InputObjectsTimespan::FullHstory is set. - LastDifference + // when InputObjectsTimespan::FullHistory is set. + LastDifference, + // Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite). + // The the above will be removed once we switch to NCycles in QC. + NCycles }; enum class PublicationDecision { @@ -54,7 +57,7 @@ struct ConfigEntry { // \brief MergerAlgorithm configuration structure. Default configuration should work in most cases, out of the box. struct MergerConfig { ConfigEntry inputObjectTimespan = {InputObjectsTimespan::FullHistory}; - ConfigEntry mergedObjectTimespan = {MergedObjectTimespan::FullHistory}; + ConfigEntry mergedObjectTimespan = {MergedObjectTimespan::FullHistory}; ConfigEntry publicationDecision = {PublicationDecision::EachNSeconds, 10}; ConfigEntry topologySize = {TopologySize::NumberOfLayers, 1}; }; diff --git a/Utilities/Mergers/src/FullHistoryMerger.cxx b/Utilities/Mergers/src/FullHistoryMerger.cxx index e4ca213abfc5f..a27541ca7ce13 100644 --- a/Utilities/Mergers/src/FullHistoryMerger.cxx +++ b/Utilities/Mergers/src/FullHistoryMerger.cxx @@ -46,6 +46,7 @@ FullHistoryMerger::~FullHistoryMerger() void FullHistoryMerger::init(framework::InitContext& ictx) { + mCyclesSinceReset = 0; } void FullHistoryMerger::run(framework::ProcessingContext& ctx) @@ -61,11 +62,36 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) } if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) { + mCyclesSinceReset++; mergeCache(); publish(ctx.outputs()); + + if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || + mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { + clear(); + } } } +// I am not calling it reset(), because it does not have to be performed during the FairMQs reset. +void FullHistoryMerger::clear() +{ + mFirstObjectSerialized.first.clear(); + delete mFirstObjectSerialized.second.header; + delete mFirstObjectSerialized.second.payload; + delete mFirstObjectSerialized.second.spec; + mFirstObjectSerialized.second.header = nullptr; + mFirstObjectSerialized.second.payload = nullptr; + mFirstObjectSerialized.second.spec = nullptr; + mMergedObject = std::monostate{}; + mCache.clear(); + mCyclesSinceReset = 0; + mTotalObjectsMerged = 0; + mObjectsMerged = 0; + mTotalUpdatesReceived = 0; + mUpdatesReceived = 0; +} + void FullHistoryMerger::updateCache(const DataRef& ref) { auto* dh = get(ref.header); @@ -77,7 +103,6 @@ void FullHistoryMerger::updateCache(const DataRef& ref) // We store one object in the serialized form, so we can take it as the first object to be merged (multiple times). // If we kept it deserialized, we would need to require implementing a clone() method in MergeInterface. - // Clang-Tidy: 'if' statement is unnecessary; deleting null pointer has no effect delete mFirstObjectSerialized.second.spec; delete mFirstObjectSerialized.second.header; delete mFirstObjectSerialized.second.payload; @@ -128,7 +153,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator) { // todo see if std::visit is faster here if (std::holds_alternative(mMergedObject)) { - LOG(INFO) << "Nothing to publish yet"; + LOG(INFO) << "No objects received since start or reset, nothing to publish"; } else if (std::holds_alternative(mMergedObject)) { allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec}, *std::get(mMergedObject)); @@ -145,6 +170,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator) mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"}); mCollector->send({mTotalUpdatesReceived, "total_updates_received"}, monitoring::DerivedMetricMode::RATE); mCollector->send({mUpdatesReceived, "updates_received_since_last_publication"}); + mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); mObjectsMerged = 0; mUpdatesReceived = 0; } diff --git a/Utilities/Mergers/src/IntegratingMerger.cxx b/Utilities/Mergers/src/IntegratingMerger.cxx index 0607e0e78888b..4c35d4074e204 100644 --- a/Utilities/Mergers/src/IntegratingMerger.cxx +++ b/Utilities/Mergers/src/IntegratingMerger.cxx @@ -41,6 +41,7 @@ IntegratingMerger::IntegratingMerger(const MergerConfig& config, const header::D void IntegratingMerger::init(framework::InitContext& ictx) { + mCyclesSinceReset = 0; } void IntegratingMerger::run(framework::ProcessingContext& ctx) @@ -71,19 +72,29 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx) } if (ctx.inputs().isValid("timer-publish")) { - + mCyclesSinceReset++; publish(ctx.outputs()); - if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference) { - mMergedObject = std::monostate{}; + if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || + mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { + clear(); } } } +// I am not calling it reset(), because it does not have to be performed during the FairMQs reset. +void IntegratingMerger::clear() +{ + mMergedObject = std::monostate{}; + mCyclesSinceReset = 0; + mTotalObjectsMerged = 0; + mObjectsMerged = 0; +} + void IntegratingMerger::publish(framework::DataAllocator& allocator) { if (std::holds_alternative(mMergedObject)) { - LOG(INFO) << "Nothing to publish yet"; + LOG(INFO) << "No objects received since start or reset, nothing to publish"; } else if (std::holds_alternative(mMergedObject)) { allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec}, *std::get(mMergedObject)); @@ -97,6 +108,7 @@ void IntegratingMerger::publish(framework::DataAllocator& allocator) mTotalObjectsMerged += mObjectsMerged; mCollector->send({mTotalObjectsMerged, "total_objects_merged"}, monitoring::DerivedMetricMode::RATE); mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"}); + mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); mObjectsMerged = 0; } diff --git a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx index 17301b5b68658..df2eac4475458 100644 --- a/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx +++ b/Utilities/Mergers/src/MergerInfrastructureBuilder.cxx @@ -104,9 +104,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure() size_t inputsPerMergerRemainder = layerInputs.size() % numberOfMergers; MergerConfig layerConfig = mConfig; - if (layer > 1 && mConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) { - layerConfig.inputObjectTimespan = {InputObjectsTimespan::FullHistory}; // in LastDifference mode only the first layer should integrate - layerConfig.mergedObjectTimespan = {MergedObjectTimespan::LastDifference}; // and objects that are merged should not be used again + if (layer < mergersPerLayer.size() - 1) { + // in intermediate layers we should reset the results, so the same data is not added many times. + layerConfig.mergedObjectTimespan = {MergedObjectTimespan::NCycles, 1}; } mergerBuilder.setConfig(layerConfig);