diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index 8cc6505fc1..a4cca52b81 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -150,6 +150,7 @@ class InfrastructureGenerator size_t numberOfLocalMachines, double cycleDurationSeconds, std::string mergingMode, + size_t resetAfterCycles, std::string monitoringUrl); static vector generateCheckRunners(framework::WorkflowSpec& workflow, std::string configurationSource); static void generateAggregator(framework::WorkflowSpec& workflow, std::string configurationSource, vector& checkRunnerOutputs); diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index d50bb1ee5f..7d7cec1724 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -90,7 +90,8 @@ class TaskRunner : public framework::Task const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; }; const framework::Options getOptions() { return mOptions; }; - void setResetAfterPublish(bool); + /// \brief Makes TaskRunner invoke TaskInterface::reset() each n cycles. n = 0 means never. + void setResetAfterCycles(size_t n = 0); /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); @@ -127,7 +128,7 @@ class TaskRunner : public framework::Task std::shared_ptr mConfigFile; // used in init only std::shared_ptr mCollector; std::shared_ptr mTask; - bool mResetAfterPublish = false; + size_t mResetAfterCycles = 0; std::shared_ptr mObjectsManager; int mRunNumber; diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index 35da104fba..1eafe1fe6d 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -42,8 +42,8 @@ class TaskRunnerFactory /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies /// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication - o2::framework::DataProcessorSpec - create(std::string taskName, std::string configurationSource, size_t id = 0, bool resetAfterPublish = false); + static o2::framework::DataProcessorSpec + create(std::string taskName, std::string configurationSource, size_t id = 0, size_t resetAfterCycles = 0); /// \brief Provides necessary customization of the TaskRunners. /// diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index a73852adfa..d677c624f1 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -68,11 +68,14 @@ framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructur auto config = ConfigurationFactory::getConfiguration(configurationSource); printVersion(); - TaskRunnerFactory taskRunnerFactory; if (config->getRecursive("qc").count("tasks")) { for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { if (taskConfig.get("active", true)) { - workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0)); + // The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks, + // thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of + // the context we run QC. + size_t resetAfterCycles = taskConfig.get("resetAfterCycles", 0); + workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles)); } } } @@ -117,9 +120,11 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co for (const auto& machine : taskConfig.get_child("localMachines")) { // We spawn a task and proxy only if we are on the right machine. if (machine.second.get("") == host) { + // If we use delta mergers, then the moving window is implemented by the last Merger layer. + // The QC Tasks should always send a delta covering one cycle. + int resetAfterCycles = taskConfig.get("mergingMode", "delta") == "delta" ? 1 : taskConfig.get("resetAfterCycles", 0); // Generate QC Task Runner - bool needsResetAfterCycle = taskConfig.get("mergingMode", "delta") == "delta"; - workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, needsResetAfterCycle)); + workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, resetAfterCycles)); // Generate an output proxy // These should be removed when we are able to declare dangling output in normal DPL devices auto remoteMachine = taskConfig.get_optional("remoteMachine"); @@ -189,7 +194,6 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur printVersion(); if (config->getRecursive("qc").count("tasks")) { - TaskRunnerFactory taskRunnerFactory; for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { if (!taskConfig.get("active", true)) { ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM; @@ -211,10 +215,13 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort), taskConfig.get("localControl", "aliecs")); - generateMergers(workflow, taskName, numberOfLocalMachines, - taskConfig.get("cycleDurationSeconds"), - taskConfig.get("mergingMode", "delta"), - config->get("qc.config.monitoring.url")); + auto mergingMode = taskConfig.get("mergingMode", "delta"); + // In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks. + size_t resetAfterCycles = mergingMode == "delta" ? taskConfig.get("resetAfterCycles", 0) : 0; + auto cycleDurationSeconds = taskConfig.get("cycleDurationSeconds") * taskConfig.get("mergerCycleMultiplier", 1); + auto monitoringUrl = config->get("qc.config.monitoring.url"); + + generateMergers(workflow, taskName, numberOfLocalMachines, cycleDurationSeconds, mergingMode, resetAfterCycles, monitoringUrl); } else if (taskConfig.get("location") == "remote") { @@ -236,8 +243,9 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur throw std::runtime_error("Configuration error: dataSource type unknown : " + type); } + auto resetAfterCycles = taskConfig.get("resetAfterCycles", 0); // Creating the remote task - workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0)); + workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles)); } } } @@ -370,7 +378,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, std::string taskName, size_t numberOfLocalMachines, double cycleDurationSeconds, - std::string mergingMode, std::string monitoringUrl) + std::string mergingMode, size_t resetAfterCycles, std::string monitoringUrl) { Inputs mergerInputs; for (size_t id = 1; id <= numberOfLocalMachines; id++) { @@ -390,8 +398,8 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, // if we are to change the mode to Full, disable reseting tasks after each cycle. mergerConfig.inputObjectTimespan = { (mergingMode.empty() || mergingMode == "delta") ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory }; mergerConfig.publicationDecision = { PublicationDecision::EachNSeconds, cycleDurationSeconds }; - mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::FullHistory, 0 }; - // for now one merger should be enough, multiple layers will be supported later + mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::NCycles, (int)resetAfterCycles }; + // for now one merger should be enough, multiple layers to be supported later mergerConfig.topologySize = { TopologySize::NumberOfLayers, 1 }; mergerConfig.monitoringUrl = monitoringUrl; mergersBuilder.setConfig(mergerConfig); diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index dc1b5f2481..f8ff581518 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -145,7 +145,7 @@ void TaskRunner::run(ProcessingContext& pCtx) if (timerReady) { finishCycle(pCtx.outputs()); - if (mResetAfterPublish) { + if (mResetAfterCycles > 0 && (mCycleNumber % mResetAfterCycles == 0)) { mTask->reset(); } if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) { @@ -194,7 +194,10 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor return action; } -void TaskRunner::setResetAfterPublish(bool resetAfterPublish) { mResetAfterPublish = resetAfterPublish; } +void TaskRunner::setResetAfterCycles(size_t n) +{ + mResetAfterCycles = n; +} std::string TaskRunner::createTaskRunnerIdString() { diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index 22dea1f03c..2013ad54e4 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -25,10 +25,10 @@ namespace o2::quality_control::core using namespace o2::framework; o2::framework::DataProcessorSpec - TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, bool resetAfterPublish) + TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, size_t resetAfterCycles) { TaskRunner qcTask{ taskName, configurationSource, id }; - qcTask.setResetAfterPublish(resetAfterPublish); + qcTask.setResetAfterCycles(resetAfterCycles); DataProcessorSpec newTask{ qcTask.getDeviceName(), diff --git a/README.md b/README.md index a590333358..6611f1f28d 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ For a general overview of our (O2) software, organization and processes, please * [Plugging the QC to an existing DPL workflow](doc/Advanced.md#plugging-the-qc-to-an-existing-dpl-workflow) * [Production of QC objects outside this framework](doc/Advanced.md#production-of-qc-objects-outside-this-framework) * [Multi-node setups](doc/Advanced.md#multi-node-setups) + * [Moving window](doc/Advanced.md#moving-window) * [Writing a DPL data producer](doc/Advanced.md#writing-a-dpl-data-producer) * [QC with DPL Analysis](doc/Advanced.md#qc-with-dpl-analysis) * [CCDB / QCDB](doc/Advanced.md#ccdb--qcdb) diff --git a/doc/Advanced.md b/doc/Advanced.md index 550ee22619..c688df6d65 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -13,6 +13,7 @@ Advanced topics * [Example 2: advanced](#example-2-advanced) * [Limitations](#limitations) * [Multi-node setups](#multi-node-setups) + * [Moving window](#moving-window) * [Writing a DPL data producer](#writing-a-dpl-data-producer) * [QC with DPL Analysis](#qc-with-dpl-analysis) * [Getting AODs directly](#getting-aods-directly) @@ -269,6 +270,44 @@ and `qc/TST/MO/MultiNodeRemote`, and corresponding Checks under the path `qc/TST When using AliECS, one has to generate workflow templates and upload them to the corresponding repository. Please contact the QC or AliECS developers to receive assistance or instruction on how to do that. +## Moving window + +By default QC Tasks are never reset, thus the MOs they produce contain data from the full run. +However, if objects should have a shorter validity range, one may add the following options to QC Task configuration: +```json + "MovingWindowTaskA": { + ... + "resetAfterCycles": "10", + } +``` +In the case above the QC Task will have the `TaskInterface::reset()` method invoked each 10 cycles. + +If the QC Task runs in parallel on many nodes and its results are merged, the effects will be different +depending on the chosen merging mode: +- If `"delta"` mode is used, the Merger in the last layer will implement the moving window, while the QC Tasks will + still reset after each cycle. Please note, that QC Tasks will fall out of sync during data acquisition, so the moving + window might contain slightly misaligned data time ranges coming from different sources. Also, due to fluctuations of + the data transfer, objects coming from different sources might appear more frequently than others. Thus, one might + notice higher occupancy on stave A one time, but the next object might contain less than average data for the same stave. +- In the `"entire"` mode, QC Tasks will reset MOs, while Mergers will use the latest available object version from each + Task. Please note that if one of the Tasks dies, an old version of MO will be still used over and over. Thus, `"delta"` + mode is advised in most use cases. + +In setups with Mergers one may also extend the Mergers cycle duration, which can help to even out any data fluctuations: +```json + "MovingWindowTaskB": { + ... + "cycleDurationSeconds" : "60", + "mergingMode" : "delta", + "mergerCycleMultiplier": "10", "": "multiplies cycleDurationSeconds in Mergers", + "resetAfterCycles": "1", "": "it could be still larger than 1" + } + ``` +In the presented case, the Merger will publish one set of complete MOs per 10 minutes, which should contain all deltas + received during this last period. Since the QC Tasks cycle is 10 times shorter, the occupancy fluctuations should be + less apparent. Please also note, that using this parameter in the `"entire"` merging mode does not make much sense, + since Mergers would use every 10th incomplete MO version when merging. + ## Writing a DPL data producer For your convenience, and although it does not lie within the QC scope, we would like to document how to write a simple data producer in the DPL. The DPL documentation can be found [here](https://github.com/AliceO2Group/AliceO2/blob/dev/Framework/Core/README.md) and for questions please head to the [forum](https://alice-talk.web.cern.ch/). @@ -694,6 +733,8 @@ the "tasks" path. "taskParameters": { "": "User Task parameters which are then accessible as a key-value map.", "myOwnKey": "myOwnValue", "": "An example of a key and a value. Nested structures are not supported" }, + "resetAfterCycles" : "0", "": "Makes the Task or Merger reset MOs each n cycles.", + "": "0 (default) means that MOs should cover the full run.", "location": "local", "": ["Location of the QC Task, it can be local or remote. Needed only for", "multi-node setups, not respected in standalone development setups."], "localMachines": [ "", "List of local machines where the QC task should run. Required only", @@ -705,7 +746,8 @@ the "tasks" path. "remotePort": "30432", "": "Remote QC machine TCP port. Required only for multi-node setups.", "localControl": "aliecs", "": ["Control software specification, \"aliecs\" (default) or \"odc\").", "Needed only for multi-node setups."], - "mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected" + "mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected", + "mergerCycleMultiplier": "1", "": "Multiplies the Merger cycle duration with respect to the QC Task cycle" } } }