Skip to content

Commit e8da056

Browse files
committed
[QC-134] Moving windows in Tasks and Mergers
1 parent 09445af commit e8da056

File tree

8 files changed

+77
-21
lines changed

8 files changed

+77
-21
lines changed

Framework/include/QualityControl/InfrastructureGenerator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ class InfrastructureGenerator
145145
std::string taskName,
146146
size_t numberOfLocalMachines,
147147
double cycleDurationSeconds,
148-
std::string mergingMode);
148+
std::string mergingMode,
149+
size_t resetAfterCycles);
149150
static vector<framework::OutputSpec> generateCheckRunners(framework::WorkflowSpec& workflow, std::string configurationSource);
150151
static void generateAggregator(framework::WorkflowSpec& workflow, std::string configurationSource, vector<framework::OutputSpec>& checkRunnerOutputs);
151152
static void generatePostProcessing(framework::WorkflowSpec& workflow, std::string configurationSource);

Framework/include/QualityControl/TaskRunner.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ class TaskRunner : public framework::Task
9090
const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; };
9191
const framework::Options getOptions() { return mOptions; };
9292

93-
void setResetAfterPublish(bool);
93+
/// \brief Makes TaskRunner invoke TaskInterface::reset() each n cycles. n = 0 means never.
94+
void setResetAfterCycles(size_t n = 0);
9495

9596
/// \brief ID string for all TaskRunner devices
9697
static std::string createTaskRunnerIdString();
@@ -127,7 +128,7 @@ class TaskRunner : public framework::Task
127128
std::shared_ptr<configuration::ConfigurationInterface> mConfigFile; // used in init only
128129
std::shared_ptr<monitoring::Monitoring> mCollector;
129130
std::shared_ptr<TaskInterface> mTask;
130-
bool mResetAfterPublish = false;
131+
size_t mResetAfterCycles = 0;
131132
std::shared_ptr<ObjectsManager> mObjectsManager;
132133
int mRunNumber;
133134

Framework/include/QualityControl/TaskRunnerFactory.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class TaskRunnerFactory
4242
/// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://")
4343
/// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies
4444
/// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication
45-
o2::framework::DataProcessorSpec
46-
create(std::string taskName, std::string configurationSource, size_t id = 0, bool resetAfterPublish = false);
45+
static o2::framework::DataProcessorSpec
46+
create(std::string taskName, std::string configurationSource, size_t id = 0, size_t resetAfterCycles = 0);
4747

4848
/// \brief Provides necessary customization of the TaskRunners.
4949
///

Framework/src/InfrastructureGenerator.cxx

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@ framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructur
5757
auto config = ConfigurationFactory::getConfiguration(configurationSource);
5858
printVersion();
5959

60-
TaskRunnerFactory taskRunnerFactory;
6160
if (config->getRecursive("qc").count("tasks")) {
6261
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
6362
if (taskConfig.get<bool>("active", true)) {
64-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
63+
// The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks,
64+
// thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of
65+
// the context we run QC.
66+
size_t resetAfterCycles = taskConfig.get<int>("resetAfterCycles", 0);
67+
workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles));
6568
}
6669
}
6770
}
@@ -105,9 +108,11 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
105108
for (const auto& machine : taskConfig.get_child("localMachines")) {
106109
// We spawn a task and proxy only if we are on the right machine.
107110
if (machine.second.get<std::string>("") == host) {
111+
// If we use delta mergers, then the moving window is implemented by the last Merger layer.
112+
// The QC Tasks should always send a delta covering one cycle.
113+
int resetAfterCycles = taskConfig.get<std::string>("mergingMode", "delta") == "delta" ? 1 : taskConfig.get<int>("resetAfterCycles", 0);
108114
// Generate QC Task Runner
109-
bool needsResetAfterCycle = taskConfig.get<std::string>("mergingMode", "delta") == "delta";
110-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, needsResetAfterCycle));
115+
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, resetAfterCycles));
111116
// Generate an output proxy
112117
// These should be removed when we are able to declare dangling output in normal DPL devices
113118
auto remoteMachine = taskConfig.get_optional<std::string>("remoteMachine");
@@ -175,7 +180,6 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
175180
printVersion();
176181

177182
if (config->getRecursive("qc").count("tasks")) {
178-
TaskRunnerFactory taskRunnerFactory;
179183
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
180184
if (!taskConfig.get<bool>("active", true)) {
181185
ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM;
@@ -196,9 +200,12 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
196200
}
197201
generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort));
198202

199-
generateMergers(workflow, taskName, numberOfLocalMachines,
200-
taskConfig.get<double>("cycleDurationSeconds"),
201-
taskConfig.get<std::string>("mergingMode", "delta"));
203+
auto mergingMode = taskConfig.get<std::string>("mergingMode", "delta");
204+
// In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks.
205+
size_t resetAfterCycles = mergingMode == "delta" ? taskConfig.get<int>("resetAfterCycles", 0) : 0;
206+
auto cycleDurationSeconds = taskConfig.get<double>("cycleDurationSeconds") * taskConfig.get<double>("mergerCycleMultiplier", 1);
207+
208+
generateMergers(workflow, taskName, numberOfLocalMachines, cycleDurationSeconds, mergingMode, resetAfterCycles);
202209

203210
} else if (taskConfig.get<std::string>("location") == "remote") {
204211

@@ -220,8 +227,9 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
220227
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
221228
}
222229

230+
auto resetAfterCycles = taskConfig.get<int>("resetAfterCycles", 0);
223231
// Creating the remote task
224-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
232+
workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, resetAfterCycles));
225233
}
226234
}
227235
}
@@ -352,7 +360,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
352360

353361
void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, std::string taskName,
354362
size_t numberOfLocalMachines, double cycleDurationSeconds,
355-
std::string mergingMode)
363+
std::string mergingMode, size_t resetAfterCycles)
356364
{
357365
Inputs mergerInputs;
358366
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
@@ -372,7 +380,7 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
372380
// if we are to change the mode to Full, disable reseting tasks after each cycle.
373381
mergerConfig.inputObjectTimespan = { (mergingMode.empty() || mergingMode == "delta") ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory };
374382
mergerConfig.publicationDecision = { PublicationDecision::EachNSeconds, cycleDurationSeconds };
375-
mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::FullHistory, 0 };
383+
mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::NCycles, (int)resetAfterCycles };
376384
// for now one merger should be enough, multiple layers to be supported later
377385
mergerConfig.topologySize = { TopologySize::NumberOfLayers, 1 };
378386
mergersBuilder.setConfig(mergerConfig);

Framework/src/TaskRunner.cxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ void TaskRunner::run(ProcessingContext& pCtx)
145145

146146
if (timerReady) {
147147
finishCycle(pCtx.outputs());
148-
if (mResetAfterPublish) {
148+
if (mResetAfterCycles > 0 && (mCycleNumber % mResetAfterCycles == 0)) {
149149
mTask->reset();
150150
}
151151
if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) {
@@ -194,7 +194,10 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor
194194
return action;
195195
}
196196

197-
void TaskRunner::setResetAfterPublish(bool resetAfterPublish) { mResetAfterPublish = resetAfterPublish; }
197+
void TaskRunner::setResetAfterCycles(size_t n)
198+
{
199+
mResetAfterCycles = n;
200+
}
198201

199202
std::string TaskRunner::createTaskRunnerIdString()
200203
{

Framework/src/TaskRunnerFactory.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ namespace o2::quality_control::core
2525
using namespace o2::framework;
2626

2727
o2::framework::DataProcessorSpec
28-
TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, bool resetAfterPublish)
28+
TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, size_t resetAfterCycles)
2929
{
3030
TaskRunner qcTask{ taskName, configurationSource, id };
31-
qcTask.setResetAfterPublish(resetAfterPublish);
31+
qcTask.setResetAfterCycles(resetAfterCycles);
3232

3333
DataProcessorSpec newTask{
3434
qcTask.getDeviceName(),

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ For a general overview of our (O2) software, organization and processes, please
6161
* [Plugging the QC to an existing DPL workflow](doc/Advanced.md#plugging-the-qc-to-an-existing-dpl-workflow)
6262
* [Production of QC objects outside this framework](doc/Advanced.md#production-of-qc-objects-outside-this-framework)
6363
* [Multi-node setups](doc/Advanced.md#multi-node-setups)
64+
* [Moving window](doc/Advanced.md#moving-window)
6465
* [Writing a DPL data producer](doc/Advanced.md#writing-a-dpl-data-producer)
6566
* [Access run conditions and calibrations from the CCDB](doc/Advanced.md#access-run-conditions-and-calibrations-from-the-ccdb)
6667
* [Definition and access of task-specific configuration](doc/Advanced.md#definition-and-access-of-task-specific-configuration)

doc/Advanced.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* [Example 2: advanced](#example-2-advanced)
1313
* [Limitations](#limitations)
1414
* [Multi-node setups](#multi-node-setups)
15+
* [Moving window](#moving-window)
1516
* [Writing a DPL data producer](#writing-a-dpl-data-producer)
1617
* [Access run conditions and calibrations from the CCDB](#access-run-conditions-and-calibrations-from-the-ccdb)
1718
* [Definition and access of task-specific configuration](#definition-and-access-of-task-specific-configuration)
@@ -257,6 +258,44 @@ and `qc/TST/MO/MultiNodeRemote`, and corresponding Checks under the path `qc/TST
257258
When using AliECS, one has to generate workflow templates and upload them to the corresponding repository. Please
258259
contact the QC or AliECS developers to receive assistance or instruction on how to do that.
259260
261+
## Moving window
262+
263+
By default QC Tasks are never reset, thus the MOs they produce contain data from the full run.
264+
However, if objects should have a shorter validity range, one may add the following options to QC Task configuration:
265+
```json
266+
"MovingWindowTaskA": {
267+
...
268+
"resetAfterCycles": "10",
269+
}
270+
```
271+
In the case above the QC Task will have the `TaskInterface::reset()` method invoked each 10 cycles.
272+
273+
If the QC Task runs in parallel on many nodes and its results are merged, the effects will be different
274+
depending on the chosen merging mode:
275+
- If `"delta"` mode is used, the Merger in the last layer will implement the moving window, while the QC Tasks will
276+
still reset after each cycle. Please note, that QC Tasks will fall out of sync during data acquisition, so the moving
277+
window might contain slightly misaligned data time ranges coming from different sources. Also, due to fluctuations of
278+
the data transfer, objects coming from different sources might appear more frequently than others. Thus, one might
279+
notice higher occupancy on stave A one time, but the next object might contain less than average data for the same stave.
280+
- In the `"entire"` mode, QC Tasks will reset MOs, while Mergers will use the latest available object version from each
281+
Task. Please note that if one of the Tasks dies, an old version of MO will be still used over and over. Thus, `"delta"`
282+
mode is advised in most use cases.
283+
284+
In setups with Mergers one may also extend the Mergers cycle duration, which can help to even out any data fluctuations:
285+
```json
286+
"MovingWindowTaskB": {
287+
...
288+
"cycleDurationSeconds" : "60",
289+
"mergingMode" : "delta",
290+
"mergerCycleMultiplier": "10", "": "multiplies cycleDurationSeconds in Mergers",
291+
"resetAfterCycles": "1", "": "it could be still larger than 1"
292+
}
293+
```
294+
In the presented case, the Merger will publish one set of complete MOs per 10 minutes, which should contain all deltas
295+
received during this last period. Since the QC Tasks cycle is 10 times shorter, the occupancy fluctuations should be
296+
less apparent. Please also note, that using this parameter in the `"entire"` merging mode does not make much sense,
297+
since Mergers would use every 10th incomplete MO version when merging.
298+
260299
## Writing a DPL data producer
261300

262301
For your convenience, and although it does not lie within the QC scope, we would like to document how to write a simple data producer in the DPL. The DPL documentation can be found [here](https://github.com/AliceO2Group/AliceO2/blob/dev/Framework/Core/README.md) and for questions please head to the [forum](https://alice-talk.web.cern.ch/).
@@ -665,6 +704,8 @@ the "tasks" path.
665704
"taskParameters": { "": "User Task parameters which are then accessible as a key-value map.",
666705
"myOwnKey": "myOwnValue", "": "An example of a key and a value. Nested structures are not supported"
667706
},
707+
"resetAfterCycles" : "0", "": "Makes the Task or Merger reset MOs each n cycles.",
708+
"": "0 (default) means that MOs should cover the full run.",
668709
"location": "local", "": ["Location of the QC Task, it can be local or remote. Needed only for",
669710
"multi-node setups, not respected in standalone development setups."],
670711
"localMachines": [ "", "List of local machines where the QC task should run. Required only",
@@ -674,7 +715,8 @@ the "tasks" path.
674715
],
675716
"remoteMachine": "o2qc1", "": "Remote QC machine hostname. Required ony for multi-node setups.",
676717
"remotePort": "30432", "": "Remote QC machine TCP port. Required ony for multi-node setups.",
677-
"mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected"
718+
"mergingMode": "delta", "": "Merging mode, \"delta\" (default) or \"entire\" objects are expected",
719+
"mergerCycleMultiplier": "1", "": "Multiplies the Merger cycle duration with respect to the QC Task cycle"
678720
}
679721
}
680722
}

0 commit comments

Comments
 (0)