Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Utilities/Mergers/include/Mergers/FullHistoryMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class FullHistoryMerger : public framework::Task

MergerConfig mConfig;
std::unique_ptr<monitoring::Monitoring> mCollector;
int mCyclesSinceReset = 0;

// stats
int mTotalObjectsMerged = 0;
Expand All @@ -65,6 +66,7 @@ class FullHistoryMerger : public framework::Task
void updateCache(const framework::DataRef& ref);
void mergeCache();
void publish(framework::DataAllocator& allocator);
void clear();
};

} // namespace o2::mergers
Expand Down
2 changes: 2 additions & 0 deletions Utilities/Mergers/include/Mergers/IntegratingMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ class IntegratingMerger : public framework::Task

private:
void publish(framework::DataAllocator& allocator);
void clear();

private:
header::DataHeader::SubSpecificationType mSubSpec;
ObjectStore mMergedObject = std::monostate{};
MergerConfig mConfig;
std::unique_ptr<monitoring::Monitoring> mCollector;
int mCyclesSinceReset = 0;

// stats
int mTotalDeltasMerged = 0;
Expand Down
9 changes: 6 additions & 3 deletions Utilities/Mergers/include/Mergers/MergerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ enum class MergedObjectTimespan {
FullHistory,
// Merged object should be an sum of differences received after last publication.
// Merged object is reset after published. It won't produce meaningful results
// when InputObjectsTimespan::FullHstory is set.
LastDifference
// when InputObjectsTimespan::FullHistory is set.
LastDifference,
// Generalisation of the two above. Resets all objects in Mergers after n cycles (0 - infinite).
// The the above will be removed once we switch to NCycles in QC.
NCycles
};

enum class PublicationDecision {
Expand All @@ -56,7 +59,7 @@ struct ConfigEntry {
// \brief MergerAlgorithm configuration structure. Default configuration should work in most cases, out of the box.
struct MergerConfig {
ConfigEntry<InputObjectsTimespan> inputObjectTimespan = {InputObjectsTimespan::FullHistory};
ConfigEntry<MergedObjectTimespan> mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
ConfigEntry<MergedObjectTimespan, int> mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
ConfigEntry<PublicationDecision> publicationDecision = {PublicationDecision::EachNSeconds, 10};
ConfigEntry<TopologySize, int> topologySize = {TopologySize::NumberOfLayers, 1};
std::string monitoringUrl = "infologger:///debug?qc";
Expand Down
30 changes: 28 additions & 2 deletions Utilities/Mergers/src/FullHistoryMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ FullHistoryMerger::~FullHistoryMerger()

void FullHistoryMerger::init(framework::InitContext& ictx)
{
mCyclesSinceReset = 0;
mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
}
Expand All @@ -62,11 +63,36 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)
}

if (ctx.inputs().isValid("timer-publish") && !mFirstObjectSerialized.first.empty()) {
mCyclesSinceReset++;
mergeCache();
publish(ctx.outputs());

if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
clear();
}
}
}

// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.
void FullHistoryMerger::clear()
{
mFirstObjectSerialized.first.clear();
delete mFirstObjectSerialized.second.header;
delete mFirstObjectSerialized.second.payload;
delete mFirstObjectSerialized.second.spec;
mFirstObjectSerialized.second.header = nullptr;
mFirstObjectSerialized.second.payload = nullptr;
mFirstObjectSerialized.second.spec = nullptr;
mMergedObject = std::monostate{};
mCache.clear();
mCyclesSinceReset = 0;
mTotalObjectsMerged = 0;
mObjectsMerged = 0;
mTotalUpdatesReceived = 0;
mUpdatesReceived = 0;
}

void FullHistoryMerger::updateCache(const DataRef& ref)
{
auto* dh = get<DataHeader*>(ref.header);
Expand All @@ -78,7 +104,6 @@ void FullHistoryMerger::updateCache(const DataRef& ref)
// We store one object in the serialized form, so we can take it as the first object to be merged (multiple times).
// If we kept it deserialized, we would need to require implementing a clone() method in MergeInterface.

// Clang-Tidy: 'if' statement is unnecessary; deleting null pointer has no effect
delete mFirstObjectSerialized.second.spec;
delete mFirstObjectSerialized.second.header;
delete mFirstObjectSerialized.second.payload;
Expand Down Expand Up @@ -129,7 +154,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator)
{
// todo see if std::visit is faster here
if (std::holds_alternative<std::monostate>(mMergedObject)) {
LOG(INFO) << "Nothing to publish yet";
LOG(INFO) << "No objects received since start or reset, nothing to publish";
} else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<MergeInterfacePtr>(mMergedObject));
Expand All @@ -150,6 +175,7 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator)
mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"});
mCollector->send({mTotalUpdatesReceived, "total_updates_received"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mUpdatesReceived, "updates_received_since_last_publication"});
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
mObjectsMerged = 0;
mUpdatesReceived = 0;
}
Expand Down
20 changes: 16 additions & 4 deletions Utilities/Mergers/src/IntegratingMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ IntegratingMerger::IntegratingMerger(const MergerConfig& config, const header::D

void IntegratingMerger::init(framework::InitContext& ictx)
{
mCyclesSinceReset = 0;
mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
}
Expand Down Expand Up @@ -68,21 +69,31 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
}

if (ctx.inputs().isValid("timer-publish")) {

mCyclesSinceReset++;
publish(ctx.outputs());

if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference) {
mMergedObject = std::monostate{};
if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
clear();
}
}
}

// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.
void IntegratingMerger::clear()
{
mMergedObject = std::monostate{};
mCyclesSinceReset = 0;
mTotalDeltasMerged = 0;
mDeltasMerged = 0;
}

void IntegratingMerger::publish(framework::DataAllocator& allocator)
{
mTotalDeltasMerged += mDeltasMerged;

if (std::holds_alternative<std::monostate>(mMergedObject)) {
LOG(INFO) << "Nothing to publish yet";
LOG(INFO) << "No objects received since start or reset, nothing to publish";
} else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<MergeInterfacePtr>(mMergedObject));
Expand All @@ -99,6 +110,7 @@ void IntegratingMerger::publish(framework::DataAllocator& allocator)

mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
mDeltasMerged = 0;
}

Expand Down
6 changes: 3 additions & 3 deletions Utilities/Mergers/src/MergerInfrastructureBuilder.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ framework::WorkflowSpec MergerInfrastructureBuilder::generateInfrastructure()
size_t inputsPerMergerRemainder = layerInputs.size() % numberOfMergers;

MergerConfig layerConfig = mConfig;
if (layer > 1 && mConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) {
layerConfig.inputObjectTimespan = {InputObjectsTimespan::FullHistory}; // in LastDifference mode only the first layer should integrate
layerConfig.mergedObjectTimespan = {MergedObjectTimespan::LastDifference}; // and objects that are merged should not be used again
if (layer < mergersPerLayer.size() - 1) {
// in intermediate layers we should reset the results, so the same data is not added many times.
layerConfig.mergedObjectTimespan = {MergedObjectTimespan::NCycles, 1};
}
mergerBuilder.setConfig(layerConfig);

Expand Down