diff --git a/Analysis/EventFiltering/CMakeLists.txt b/Analysis/EventFiltering/CMakeLists.txt index 3c1b5e9365790..da9749b91f0d3 100644 --- a/Analysis/EventFiltering/CMakeLists.txt +++ b/Analysis/EventFiltering/CMakeLists.txt @@ -9,14 +9,11 @@ # granted to it by virtue of its status as an Intergovernmental Organization # or submit itself to any jurisdiction. -o2_add_library(EventFiltering - SOURCES centralEventFilterProcessor.cxx - PUBLIC_LINK_LIBRARIES O2::Framework O2::DetectorsBase O2::AnalysisDataModel O2::AnalysisCore) -o2_add_dpl_workflow(central-event-filter-processor - SOURCES cefp.cxx +o2_add_dpl_workflow(central-event-filter-task + SOURCES cefpTask.cxx COMPONENT_NAME Analysis - PUBLIC_LINK_LIBRARIES O2::EventFiltering) + PUBLIC_LINK_LIBRARIES O2::Framework O2::AnalysisDataModel O2::AnalysisCore) o2_add_dpl_workflow(nuclei-filter SOURCES nucleiFilter.cxx diff --git a/Analysis/EventFiltering/cefp.cxx b/Analysis/EventFiltering/cefp.cxx deleted file mode 100644 index 981f439fc97e7..0000000000000 --- a/Analysis/EventFiltering/cefp.cxx +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#include "CommonUtils/ConfigurableParam.h" -#include "centralEventFilterProcessor.h" - -using namespace o2::framework; - -// ------------------------------------------------------------------ - -// we need to add workflow options before including Framework/runDataProcessing -void customize(std::vector& workflowOptions) -{ - // option allowing to set parameters - workflowOptions.push_back(ConfigParamSpec{"config", o2::framework::VariantType::String, "train_config.json", {"Configuration of the filtering"}}); -} - -// ------------------------------------------------------------------ - -#include "Framework/runDataProcessing.h" -#include "Framework/Logger.h" - -WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) -{ - auto config = configcontext.options().get("config"); - - if (config.empty()) { - LOG(FATAL) << "We need a configuration for the centralEventFilterProcessor"; - throw std::runtime_error("incompatible options provided"); - } - - WorkflowSpec specs; - specs.emplace_back(o2::aod::filtering::getCentralEventFilterProcessorSpec(config)); - return std::move(specs); -} diff --git a/Analysis/EventFiltering/cefpTask.cxx b/Analysis/EventFiltering/cefpTask.cxx new file mode 100644 index 0000000000000..7fcf06ac2b6ff --- /dev/null +++ b/Analysis/EventFiltering/cefpTask.cxx @@ -0,0 +1,226 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// O2 includes + +#include "Framework/AnalysisTask.h" +#include "Framework/AnalysisDataModel.h" +#include "Framework/ASoAHelpers.h" +#include "AnalysisDataModel/TrackSelectionTables.h" + +#include "filterTables.h" + +#include "Framework/HistogramRegistry.h" + +#include +#include +#include +#include +#include +#include + +// we need to add workflow options before including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + // option allowing to set parameters + std::vector options{o2::framework::ConfigParamSpec{"train_config", o2::framework::VariantType::String, "full_config.json", {"Configuration of the filtering train"}}}; + + std::swap(workflowOptions, options); +} + +#include "Framework/runDataProcessing.h" + +using namespace o2; +using namespace o2::aod; +using namespace o2::framework; +using namespace o2::framework::expressions; +using namespace rapidjson; + +namespace +{ +bool readJsonFile(std::string& config, Document& d) +{ + FILE* fp = fopen(config.data(), "rb"); + if (!fp) { + LOG(WARNING) << "Missing configuration json file: " << config; + return false; + } + + char readBuffer[65536]; + FileReadStream is(fp, readBuffer, sizeof(readBuffer)); + + d.ParseStream(is); + fclose(fp); + return true; +} + +std::unordered_map> mDownscaling; +static const std::vector downscalingName{"Downscaling"}; +static const float defaultDownscaling[32][1]{ + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}, + {1.f}}; /// Max number of columns for triggers is 32 (extendible) + +#define FILTER_CONFIGURABLE(_TYPE_) \ + Configurable> cfg##_TYPE_ { #_TYPE_, {defaultDownscaling[0], NumberOfColumns < _TYPE_>(), 1, ColumnsNames(typename _TYPE_::iterator::persistent_columns_t{}), downscalingName }, #_TYPE_ " downscalings" } + +} // namespace + +struct centralEventFilterTask { + + HistogramRegistry scalers{"scalers", {}, OutputObjHandlingPolicy::AnalysisObject, true, true}; + + FILTER_CONFIGURABLE(NucleiFilters); + FILTER_CONFIGURABLE(DiffractionFilters); + + void init(o2::framework::InitContext& initc) + { + LOG(INFO) << "Start init"; + int nCols{0}; + for (auto& table : mDownscaling) { + nCols += table.second.size(); + } + LOG(INFO) << "Middle init, total number of columns " << nCols; + + scalers.add("mScalers", "", HistType::kTH1F, {{nCols + 1, -0.5, 0.5 + nCols, ";;Number of events"}}); + scalers.add("mFiltered", "", HistType::kTH1F, {{nCols + 1, -0.5, 0.5 + nCols, ";;Number of filtered events"}}); + auto mScalers = scalers.get(HIST("mScalers")); + auto mFiltered = scalers.get(HIST("mFiltered")); + + mScalers->GetXaxis()->SetBinLabel(1, "Total number of events"); + mFiltered->GetXaxis()->SetBinLabel(1, "Total number of events"); + int bin{2}; + for (auto& table : mDownscaling) { + for (auto& column : table.second) { + mScalers->GetXaxis()->SetBinLabel(bin, column.first.data()); + mFiltered->GetXaxis()->SetBinLabel(bin++, column.first.data()); + } + if (initc.options().isDefault(table.first.data()) || !initc.options().isSet(table.first.data())) { + continue; + } + auto filterOpt = initc.mOptions.get>(table.first.data()); + for (auto& col : table.second) { + col.second = filterOpt.get(col.first.data(), 0u); + } + } + } + + void run(ProcessingContext& pc) + { + + auto mScalers = scalers.get(HIST("mScalers")); + auto mFiltered = scalers.get(HIST("mFiltered")); + int64_t nEvents{-1}; + for (auto& tableName : mDownscaling) { + auto tableConsumer = pc.inputs().get(tableName.first); + + auto tablePtr{tableConsumer->asArrowTable()}; + int64_t nRows{tablePtr->num_rows()}; + nEvents = nEvents < 0 ? nRows : nEvents; + if (nEvents != nRows) { + LOG(FATAL) << "Inconsistent number of rows across trigger tables."; + } + + auto schema{tablePtr->schema()}; + for (auto& colName : tableName.second) { + int bin{mScalers->GetXaxis()->FindBin(colName.first.data())}; + double binCenter{mScalers->GetXaxis()->GetBinCenter(bin)}; + auto column{tablePtr->GetColumnByName(colName.first)}; + double downscaling{colName.second}; + if (column) { + for (int64_t iC{0}; iC < column->num_chunks(); ++iC) { + auto chunk{column->chunk(iC)}; + auto boolArray = std::static_pointer_cast(chunk); + for (int64_t iS{0}; iS < chunk->length(); ++iS) { + if (boolArray->Value(iS)) { + mScalers->Fill(binCenter); + if (mUniformGenerator(mGeneratorEngine) < downscaling) { + mFiltered->Fill(binCenter); + } + } + } + } + } + } + } + mScalers->SetBinContent(1, mScalers->GetBinContent(1) + nEvents); + mFiltered->SetBinContent(1, mFiltered->GetBinContent(1) + nEvents); + } + + std::mt19937_64 mGeneratorEngine; + std::uniform_real_distribution mUniformGenerator = std::uniform_real_distribution(0., 1.); +}; + +WorkflowSpec defineDataProcessing(ConfigContext const& cfg) +{ + std::vector inputs; + + auto config = cfg.options().get("train_config"); + Document d; + std::unordered_map> downscalings; + FillFiltersMap(FiltersPack, downscalings); + + std::array enabledFilters = {false}; + if (readJsonFile(config, d)) { + for (auto& workflow : d["workflows"].GetArray()) { + for (uint32_t iFilter{0}; iFilter < NumberOfFilters; ++iFilter) { + if (std::string_view(workflow["workflow_name"].GetString()) == std::string_view(FilteringTaskNames[iFilter])) { + inputs.emplace_back(std::string(AvailableFilters[iFilter]), "AOD", FilterDescriptions[iFilter], 0, Lifetime::Timeframe); + enabledFilters[iFilter] = true; + break; + } + } + } + } + + for (uint32_t iFilter{0}; iFilter < NumberOfFilters; ++iFilter) { + if (!enabledFilters[iFilter]) { + LOG(INFO) << std::string_view(AvailableFilters[iFilter]) << " not present in the configuration, removing it."; + downscalings.erase(std::string(AvailableFilters[iFilter])); + } + } + + DataProcessorSpec spec{adaptAnalysisTask(cfg)}; + for (auto& input : inputs) { + spec.inputs.emplace_back(input); + } + mDownscaling.swap(downscalings); + + return WorkflowSpec{spec}; +} diff --git a/Analysis/EventFiltering/centralEventFilterProcessor.cxx b/Analysis/EventFiltering/centralEventFilterProcessor.cxx deleted file mode 100644 index de6adecd0145d..0000000000000 --- a/Analysis/EventFiltering/centralEventFilterProcessor.cxx +++ /dev/null @@ -1,198 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file centralEventFilterProcessor.cxx - -#include "centralEventFilterProcessor.h" - -#include "Framework/ControlService.h" -#include "Framework/ConfigParamRegistry.h" -#include -#include "Framework/TableConsumer.h" - -#include -#include - -#include -#include -#include -#include -#include - -using namespace o2::framework; -using namespace rapidjson; - -namespace -{ -bool readJsonFile(std::string& config, Document& d) -{ - FILE* fp = fopen(config.data(), "rb"); - if (!fp) { - LOG(ERROR) << "Missing configuration json file: " << config; - return false; - } - - char readBuffer[65536]; - FileReadStream is(fp, readBuffer, sizeof(readBuffer)); - - d.ParseStream(is); - fclose(fp); - return true; -} -} // namespace - -namespace o2::aod::filtering -{ - -void CentralEventFilterProcessor::init(framework::InitContext& ic) -{ - // JSON example - // { - // "subwagon_name" : "CentralEventFilterProcessor", - // "configuration" : { - // "NucleiFilters" : { - // "H2" : 0.1, - // "H3" : 0.3, - // "HE3" : 1., - // "HE4" : 1. - // } - // } - // } - LOG(INFO) << "Start init"; - Document d; - int nCols{0}; - if (readJsonFile(mConfigFile, d)) { - for (auto& workflow : d["workflows"].GetArray()) { - if (std::string_view(workflow["subwagon_name"].GetString()) == "CentralEventFilterProcessor") { - auto& config = workflow["configuration"]; - for (auto& filter : AvailableFilters) { - auto& filterConfig = config[filter]; - if (filterConfig.IsObject()) { - std::unordered_map tableMap; - for (auto& node : filterConfig.GetObject()) { - tableMap[node.name.GetString()] = node.value.GetDouble(); - nCols++; - } - LOG(INFO) << "Enabling downscaling map for filter: " << filter; - mDownscaling[filter] = tableMap; - } - } - break; - } - } - } - LOG(INFO) << "Middle init" << std::endl; - mScalers = new TH1D("mScalers", ";;Number of events", nCols + 1, -0.5, 0.5 + nCols); - mScalers->GetXaxis()->SetBinLabel(1, "Total number of events"); - - mFiltered = new TH1D("mFiltered", ";;Number of filtered events", nCols + 1, -0.5, 0.5 + nCols); - mFiltered->GetXaxis()->SetBinLabel(1, "Total number of events"); - - int bin{2}; - for (auto& table : mDownscaling) { - for (auto& column : table.second) { - mScalers->GetXaxis()->SetBinLabel(bin, column.first.data()); - mFiltered->GetXaxis()->SetBinLabel(bin++, column.first.data()); - } - } - - TFile test("test.root", "recreate"); - mScalers->Clone()->Write(); - test.Close(); -} - -void CentralEventFilterProcessor::run(ProcessingContext& pc) -{ - int64_t nEvents{-1}; - for (auto& tableName : mDownscaling) { - auto tableConsumer = pc.inputs().get(tableName.first); - - auto tablePtr{tableConsumer->asArrowTable()}; - int64_t nRows{tablePtr->num_rows()}; - nEvents = nEvents < 0 ? nRows : nEvents; - if (nEvents != nRows) { - LOG(FATAL) << "Inconsistent number of rows across trigger tables, fatal" << std::endl; ///TODO: move it to real fatal - } - - auto schema{tablePtr->schema()}; - for (auto& colName : tableName.second) { - int bin{mScalers->GetXaxis()->FindBin(colName.first.data())}; - double binCenter{mScalers->GetXaxis()->GetBinCenter(bin)}; - auto column{tablePtr->GetColumnByName(colName.first)}; - double downscaling{colName.second}; - if (column) { - for (int64_t iC{0}; iC < column->num_chunks(); ++iC) { - auto chunk{column->chunk(iC)}; - auto boolArray = std::static_pointer_cast(chunk); - for (int64_t iS{0}; iS < chunk->length(); ++iS) { - if (boolArray->Value(iS)) { - mScalers->Fill(binCenter); - if (mUniformGenerator(mGeneratorEngine) < downscaling) { - mFiltered->Fill(binCenter); - } - } - } - } - } - } - } - mScalers->SetBinContent(1, mScalers->GetBinContent(1) + nEvents); - mFiltered->SetBinContent(1, mFiltered->GetBinContent(1) + nEvents); -} - -void CentralEventFilterProcessor::endOfStream(EndOfStreamContext& ec) -{ - TFile output("trigger.root", "recreate"); - mScalers->Write("Scalers"); - mFiltered->Write("FilteredScalers"); - if (mScalers->GetBinContent(1) > 1.e-24) { - mScalers->Scale(1. / mScalers->GetBinContent(1)); - } - if (mFiltered->GetBinContent(1) > 1.e-24) { - mFiltered->Scale(1. / mFiltered->GetBinContent(1)); - } - mScalers->Write("Fractions"); - mFiltered->Write("FractionsDownscaled"); - output.Close(); -} - -DataProcessorSpec getCentralEventFilterProcessorSpec(std::string& config) -{ - - std::vector inputs; - Document d; - - if (readJsonFile(config, d)) { - for (auto& workflow : d["workflows"].GetArray()) { - for (unsigned int iFilter{0}; iFilter < AvailableFilters.size(); ++iFilter) { - if (std::string_view(workflow["subwagon_name"].GetString()) == std::string_view(AvailableFilters[iFilter])) { - inputs.emplace_back(std::string(AvailableFilters[iFilter]), "AOD", FilterDescriptions[iFilter], 0, Lifetime::Timeframe); - LOG(INFO) << "Adding input " << std::string_view(AvailableFilters[iFilter]) << std::endl; - break; - } - } - } - } - - std::vector outputs; - outputs.emplace_back("AOD", "Decision", 0, Lifetime::Timeframe); - outputs.emplace_back("TFN", "TFNumber", 0, Lifetime::Timeframe); - - return DataProcessorSpec{ - "o2-central-event-filter-processor", - inputs, - outputs, - AlgorithmSpec{adaptFromTask(config)}, - Options{ - {"filtering-config", VariantType::String, "", {"Path to the filtering json config file"}}}}; -} - -} // namespace o2::aod::filtering diff --git a/Analysis/EventFiltering/centralEventFilterProcessor.h b/Analysis/EventFiltering/centralEventFilterProcessor.h deleted file mode 100644 index 0e23c4f31bd56..0000000000000 --- a/Analysis/EventFiltering/centralEventFilterProcessor.h +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file centralEventFilterProcessor.h - -#ifndef O2_CentralEventFilterProcessor -#define O2_CentralEventFilterProcessor - -#include "Framework/DataProcessorSpec.h" -#include "Framework/Task.h" - -#include -#include "filterTables.h" - -class TH1D; - -namespace o2::aod::filtering -{ - -class CentralEventFilterProcessor : public framework::Task -{ - public: - CentralEventFilterProcessor(const std::string& config) : mConfigFile{config} {} - ~CentralEventFilterProcessor() override = default; - void init(framework::InitContext& ic) final; - void run(framework::ProcessingContext& pc) final; - void endOfStream(framework::EndOfStreamContext& ec) final; - - private: - TH1D* mScalers; - TH1D* mFiltered; - std::string mConfigFile; - std::unordered_map> mDownscaling; - std::mt19937_64 mGeneratorEngine; - std::uniform_real_distribution mUniformGenerator = std::uniform_real_distribution(0., 1.); -}; - -/// create a processor spec -framework::DataProcessorSpec getCentralEventFilterProcessorSpec(std::string& config); - -} // namespace o2::aod::filtering - -#endif /* O2_CentralEventFilterProcessor */ diff --git a/Analysis/EventFiltering/filterTables.h b/Analysis/EventFiltering/filterTables.h index a05dc7b41d488..c54fe6fd173e5 100644 --- a/Analysis/EventFiltering/filterTables.h +++ b/Analysis/EventFiltering/filterTables.h @@ -28,12 +28,9 @@ DECLARE_SOA_COLUMN(DG, hasDG, bool); //! Double Gap events, DG } // namespace filtering -DECLARE_SOA_TABLE(NucleiFilters, "AOD", "Nuclei Filters", //! +// nuclei +DECLARE_SOA_TABLE(NucleiFilters, "AOD", "NucleiFilters", //! filtering::H2, filtering::H3, filtering::He3, filtering::He4); - -constexpr std::array AvailableFilters{"NucleiFilters", "DiffractionFilters"}; -constexpr std::array FilterDescriptions{"Nuclei Filters", "DiffFilters"}; - using NucleiFilter = NucleiFilters::iterator; // diffraction @@ -41,6 +38,44 @@ DECLARE_SOA_TABLE(DiffractionFilters, "AOD", "DiffFilters", //! Diffraction filt filtering::DG); using DiffractionFilter = DiffractionFilters::iterator; +/// List of the available filters, the description of their tables and the name of the tasks +constexpr int NumberOfFilters{2}; +constexpr std::array AvailableFilters{"NucleiFilters", "DiffractionFilters"}; +constexpr std::array FilterDescriptions{"NucleiFilters", "DiffFilters"}; +constexpr std::array FilteringTaskNames{"o2-analysis-nuclei-filter", "o2-analysis-diffraction-filter"}; +constexpr o2::framework::pack FiltersPack; +static_assert(o2::framework::pack_size(FiltersPack) == NumberOfFilters); + +template +void addColumnToMap(std::unordered_map>& map) +{ + map[MetadataTrait::metadata::tableLabel()][C::columnLabel()] = 1.f; +} + +template +void addColumnsToMap(o2::framework::pack, std::unordered_map>& map) +{ + (addColumnToMap(map), ...); +} + +template +void FillFiltersMap(o2::framework::pack, std::unordered_map>& map) +{ + (addColumnsToMap(typename T::iterator::persistent_columns_t{}, map), ...); +} + +template +static std::vector ColumnsNames(o2::framework::pack) +{ + return {C::columnLabel()...}; +} + +template +unsigned int NumberOfColumns() +{ + return o2::framework::pack_size(typename T::iterator::persistent_columns_t{}); +} + } // namespace o2::aod #endif // O2_ANALYSIS_TRIGGER_H_