Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class InfrastructureGenerator
size_t numberOfLocalMachines,
double cycleDurationSeconds,
std::string mergingMode,
size_t resetAfterCycles,
std::string monitoringUrl);
static vector<framework::OutputSpec> generateCheckRunners(framework::WorkflowSpec& workflow, std::string configurationSource);
static void generateAggregator(framework::WorkflowSpec& workflow, std::string configurationSource, vector<framework::OutputSpec>& checkRunnerOutputs);
Expand Down
5 changes: 3 additions & 2 deletions Framework/include/QualityControl/TaskRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class TaskRunner : public framework::Task
const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; };
const framework::Options getOptions() { return mOptions; };

void setResetAfterPublish(bool);
/// \brief Makes TaskRunner invoke TaskInterface::reset() each n cycles. n = 0 means never.
void setResetAfterCycles(size_t n = 0);

/// \brief ID string for all TaskRunner devices
static std::string createTaskRunnerIdString();
Expand Down Expand Up @@ -127,7 +128,7 @@ class TaskRunner : public framework::Task
std::shared_ptr<configuration::ConfigurationInterface> mConfigFile; // used in init only
std::shared_ptr<monitoring::Monitoring> mCollector;
std::shared_ptr<TaskInterface> mTask;
bool mResetAfterPublish = false;
size_t mResetAfterCycles = 0;
std::shared_ptr<ObjectsManager> mObjectsManager;
int mRunNumber;

Expand Down
4 changes: 2 additions & 2 deletions Framework/include/QualityControl/TaskRunnerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class TaskRunnerFactory
/// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://")
/// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies
/// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication
o2::framework::DataProcessorSpec
create(std::string taskName, std::string configurationSource, size_t id = 0, bool resetAfterPublish = false);
static o2::framework::DataProcessorSpec
create(std::string taskName, std::string configurationSource, size_t id = 0, size_t resetAfterCycles = 0);

/// \brief Provides necessary customization of the TaskRunners.
///
Expand Down
34 changes: 21 additions & 13 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructur
auto config = ConfigurationFactory::getConfiguration(configurationSource);
printVersion();

TaskRunnerFactory taskRunnerFactory;
if (config->getRecursive("qc").count("tasks")) {
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
if (taskConfig.get<bool>("active", true)) {
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
// The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks,
// thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of
// the context we run QC.
size_t resetAfterCycles = taskConfig.get<int>("resetAfterCycles", 0);
workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles));
}
}
}
Expand Down Expand Up @@ -117,9 +120,11 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
for (const auto& machine : taskConfig.get_child("localMachines")) {
// We spawn a task and proxy only if we are on the right machine.
if (machine.second.get<std::string>("") == host) {
// If we use delta mergers, then the moving window is implemented by the last Merger layer.
// The QC Tasks should always send a delta covering one cycle.
int resetAfterCycles = taskConfig.get<std::string>("mergingMode", "delta") == "delta" ? 1 : taskConfig.get<int>("resetAfterCycles", 0);
// Generate QC Task Runner
bool needsResetAfterCycle = taskConfig.get<std::string>("mergingMode", "delta") == "delta";
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, needsResetAfterCycle));
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, resetAfterCycles));
// Generate an output proxy
// These should be removed when we are able to declare dangling output in normal DPL devices
auto remoteMachine = taskConfig.get_optional<std::string>("remoteMachine");
Expand Down Expand Up @@ -189,7 +194,6 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
printVersion();

if (config->getRecursive("qc").count("tasks")) {
TaskRunnerFactory taskRunnerFactory;
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
if (!taskConfig.get<bool>("active", true)) {
ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM;
Expand All @@ -211,10 +215,13 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort),
taskConfig.get<std::string>("localControl", "aliecs"));

generateMergers(workflow, taskName, numberOfLocalMachines,
taskConfig.get<double>("cycleDurationSeconds"),
taskConfig.get<std::string>("mergingMode", "delta"),
config->get<std::string>("qc.config.monitoring.url"));
auto mergingMode = taskConfig.get<std::string>("mergingMode", "delta");
// In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks.
size_t resetAfterCycles = mergingMode == "delta" ? taskConfig.get<int>("resetAfterCycles", 0) : 0;
auto cycleDurationSeconds = taskConfig.get<double>("cycleDurationSeconds") * taskConfig.get<double>("mergerCycleMultiplier", 1);
auto monitoringUrl = config->get<std::string>("qc.config.monitoring.url");

generateMergers(workflow, taskName, numberOfLocalMachines, cycleDurationSeconds, mergingMode, resetAfterCycles, monitoringUrl);

} else if (taskConfig.get<std::string>("location") == "remote") {

Expand All @@ -236,8 +243,9 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
}

auto resetAfterCycles = taskConfig.get<int>("resetAfterCycles", 0);
// Creating the remote task
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles));
}
}
}
Expand Down Expand Up @@ -370,7 +378,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp

void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, std::string taskName,
size_t numberOfLocalMachines, double cycleDurationSeconds,
std::string mergingMode, std::string monitoringUrl)
std::string mergingMode, size_t resetAfterCycles, std::string monitoringUrl)
{
Inputs mergerInputs;
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
Expand All @@ -390,8 +398,8 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
// if we are to change the mode to Full, disable reseting tasks after each cycle.
mergerConfig.inputObjectTimespan = { (mergingMode.empty() || mergingMode == "delta") ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory };
mergerConfig.publicationDecision = { PublicationDecision::EachNSeconds, cycleDurationSeconds };
mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::FullHistory, 0 };
// for now one merger should be enough, multiple layers will be supported later
mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::NCycles, (int)resetAfterCycles };
// for now one merger should be enough, multiple layers to be supported later
mergerConfig.topologySize = { TopologySize::NumberOfLayers, 1 };
mergerConfig.monitoringUrl = monitoringUrl;
mergersBuilder.setConfig(mergerConfig);
Expand Down
7 changes: 5 additions & 2 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void TaskRunner::run(ProcessingContext& pCtx)

if (timerReady) {
finishCycle(pCtx.outputs());
if (mResetAfterPublish) {
if (mResetAfterCycles > 0 && (mCycleNumber % mResetAfterCycles == 0)) {
mTask->reset();
}
if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) {
Expand Down Expand Up @@ -194,7 +194,10 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor
return action;
}

void TaskRunner::setResetAfterPublish(bool resetAfterPublish) { mResetAfterPublish = resetAfterPublish; }
void TaskRunner::setResetAfterCycles(size_t n)
{
mResetAfterCycles = n;
}

std::string TaskRunner::createTaskRunnerIdString()
{
Expand Down
4 changes: 2 additions & 2 deletions Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ namespace o2::quality_control::core
using namespace o2::framework;

o2::framework::DataProcessorSpec
TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, bool resetAfterPublish)
TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, size_t resetAfterCycles)
{
TaskRunner qcTask{ taskName, configurationSource, id };
qcTask.setResetAfterPublish(resetAfterPublish);
qcTask.setResetAfterCycles(resetAfterCycles);

DataProcessorSpec newTask{
qcTask.getDeviceName(),
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ For a general overview of our (O2) software, organization and processes, please
* [Plugging the QC to an existing DPL workflow](doc/Advanced.md#plugging-the-qc-to-an-existing-dpl-workflow)
* [Production of QC objects outside this framework](doc/Advanced.md#production-of-qc-objects-outside-this-framework)
* [Multi-node setups](doc/Advanced.md#multi-node-setups)
* [Moving window](doc/Advanced.md#moving-window)
* [Writing a DPL data producer](doc/Advanced.md#writing-a-dpl-data-producer)
* [QC with DPL Analysis](doc/Advanced.md#qc-with-dpl-analysis)
* [CCDB / QCDB](doc/Advanced.md#ccdb--qcdb)
Expand Down
44 changes: 43 additions & 1 deletion doc/Advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Advanced topics
* [Example 2: advanced](#example-2-advanced)
* [Limitations](#limitations)
* [Multi-node setups](#multi-node-setups)
* [Moving window](#moving-window)
* [Writing a DPL data producer](#writing-a-dpl-data-producer)
* [QC with DPL Analysis](#qc-with-dpl-analysis)
* [Getting AODs directly](#getting-aods-directly)
Expand Down Expand Up @@ -269,6 +270,44 @@ and `qc/TST/MO/MultiNodeRemote`, and corresponding Checks under the path `qc/TST
When using AliECS, one has to generate workflow templates and upload them to the corresponding repository. Please
contact the QC or AliECS developers to receive assistance or instruction on how to do that.

## Moving window

By default QC Tasks are never reset, thus the MOs they produce contain data from the full run.
However, if objects should have a shorter validity range, one may add the following options to QC Task configuration:
```json
"MovingWindowTaskA": {
...
"resetAfterCycles": "10",
}
```
In the case above the QC Task will have the `TaskInterface::reset()` method invoked each 10 cycles.

If the QC Task runs in parallel on many nodes and its results are merged, the effects will be different
depending on the chosen merging mode:
- If `"delta"` mode is used, the Merger in the last layer will implement the moving window, while the QC Tasks will
still reset after each cycle. Please note, that QC Tasks will fall out of sync during data acquisition, so the moving
window might contain slightly misaligned data time ranges coming from different sources. Also, due to fluctuations of
the data transfer, objects coming from different sources might appear more frequently than others. Thus, one might
notice higher occupancy on stave A one time, but the next object might contain less than average data for the same stave.
- In the `"entire"` mode, QC Tasks will reset MOs, while Mergers will use the latest available object version from each
Task. Please note that if one of the Tasks dies, an old version of MO will be still used over and over. Thus, `"delta"`
mode is advised in most use cases.

In setups with Mergers one may also extend the Mergers cycle duration, which can help to even out any data fluctuations:
```json
"MovingWindowTaskB": {
...
"cycleDurationSeconds" : "60",
"mergingMode" : "delta",
"mergerCycleMultiplier": "10", "": "multiplies cycleDurationSeconds in Mergers",
"resetAfterCycles": "1", "": "it could be still larger than 1"
}
```
In the presented case, the Merger will publish one set of complete MOs per 10 minutes, which should contain all deltas
received during this last period. Since the QC Tasks cycle is 10 times shorter, the occupancy fluctuations should be
less apparent. Please also note, that using this parameter in the `"entire"` merging mode does not make much sense,
since Mergers would use every 10th incomplete MO version when merging.

## Writing a DPL data producer

For your convenience, and although it does not lie within the QC scope, we would like to document how to write a simple data producer in the DPL. The DPL documentation can be found [here](https://github.com/AliceO2Group/AliceO2/blob/dev/Framework/Core/README.md) and for questions please head to the [forum](https://alice-talk.web.cern.ch/).
Expand Down Expand Up @@ -694,6 +733,8 @@ the "tasks" path.
"taskParameters": { "": "User Task parameters which are then accessible as a key-value map.",
"myOwnKey": "myOwnValue", "": "An example of a key and a value. Nested structures are not supported"
},
"resetAfterCycles" : "0", "": "Makes the Task or Merger reset MOs each n cycles.",
"": "0 (default) means that MOs should cover the full run.",
"location": "local", "": ["Location of the QC Task, it can be local or remote. Needed only for",
"multi-node setups, not respected in standalone development setups."],
"localMachines": [ "", "List of local machines where the QC task should run. Required only",
Expand All @@ -705,7 +746,8 @@ the "tasks" path.
"remotePort": "30432", "": "Remote QC machine TCP port. Required only for multi-node setups.",
"localControl": "aliecs", "": ["Control software specification, \"aliecs\" (default) or \"odc\").",
"Needed only for multi-node setups."],
"mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected"
"mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected",
"mergerCycleMultiplier": "1", "": "Multiplies the Merger cycle duration with respect to the QC Task cycle"
}
}
}
Expand Down