diff --git a/DataFormats/Headers/include/Headers/DataHeader.h b/DataFormats/Headers/include/Headers/DataHeader.h index 62afee27882bf..67e48041073b9 100644 --- a/DataFormats/Headers/include/Headers/DataHeader.h +++ b/DataFormats/Headers/include/Headers/DataHeader.h @@ -317,6 +317,7 @@ constexpr o2::header::SerializationMethod gSerializationMethodAny{"*******"}; constexpr o2::header::SerializationMethod gSerializationMethodInvalid{"INVALID"}; constexpr o2::header::SerializationMethod gSerializationMethodNone{"NONE"}; constexpr o2::header::SerializationMethod gSerializationMethodROOT{"ROOT"}; +constexpr o2::header::SerializationMethod gSerializationMethodCCDB{"CCDB"}; constexpr o2::header::SerializationMethod gSerializationMethodFlatBuf{"FLATBUF"}; constexpr o2::header::SerializationMethod gSerializationMethodArrow{"ARROW"}; diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 51eaf4279ec13..4504f262bea8c 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -43,6 +43,7 @@ o2_add_library(Framework src/DataProcessingDevice.cxx src/DataProcessingHeader.cxx src/DataProcessingHelpers.cxx + src/DataRefUtils.cxx src/SourceInfoHeader.cxx src/DataProcessor.cxx src/DataRelayer.cxx @@ -127,6 +128,7 @@ o2_add_library(Framework FairMQ::FairMQ O2::CommonUtils O2::MathUtils + O2::CCDB O2::FrameworkFoundation O2::Headers O2::MemoryResources diff --git a/Framework/Core/include/Framework/DataRefUtils.h b/Framework/Core/include/Framework/DataRefUtils.h index 0a9e6d6b24a02..97ded9972bc2d 100644 --- a/Framework/Core/include/Framework/DataRefUtils.h +++ b/Framework/Core/include/Framework/DataRefUtils.h @@ -23,6 +23,7 @@ #include #include +#include namespace o2::framework { @@ -150,8 +151,15 @@ struct DataRefUtils { } }); return std::move(result); + } else if constexpr (is_specialization::value == true) { + using wrapped = typename T::wrapped_type; + using DataHeader = o2::header::DataHeader; + std::unique_ptr result(static_cast(DataRefUtils::decodeCCDB(ref, typeid(wrapped)))); + return std::move(result); } } + // Decode a CCDB object using the CcdbApi. + static void* decodeCCDB(DataRef const& ref, std::type_info const& info); static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef& ref) { diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index 08f2b053f4c30..240e524f7d701 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -383,6 +383,9 @@ class InputRecord // return type with owning Deleter instance, forwarding to default_deleter std::unique_ptr> result(DataRefUtils::as>(ref).release()); return result; + } else if (method == o2::header::gSerializationMethodCCDB) { + std::unique_ptr> result(DataRefUtils::as>(ref).release()); + return result; } else { throw runtime_error("Attempt to extract object from message with unsupported serialization type"); } diff --git a/Framework/Core/include/Framework/SerializationMethods.h b/Framework/Core/include/Framework/SerializationMethods.h index 54e53c5af884f..4d20e8861fad5 100644 --- a/Framework/Core/include/Framework/SerializationMethods.h +++ b/Framework/Core/include/Framework/SerializationMethods.h @@ -100,6 +100,31 @@ class BoostSerialized private: wrapped_type& mRef; }; + +template +class CCDBSerialized +{ + public: + using non_messageable = o2::framework::MarkAsNonMessageable; + using wrapped_type = T; + using hint_type = HintType; + + static_assert(std::is_pointer::value == false, "wrapped type can not be a pointer"); + static_assert(std::is_pointer::value == false, "hint type can not be a pointer"); + + CCDBSerialized() = delete; + CCDBSerialized(wrapped_type& ref, hint_type* hint = nullptr) : mRef(ref), mHint(hint) {} + + T& operator()() { return mRef; } + T const& operator()() const { return mRef; } + + hint_type* getHint() const { return mHint; } + + private: + wrapped_type& mRef; + hint_type* mHint; // optional hint e.g. class info or class name +}; + } // namespace framework } // namespace o2 #endif // FRAMEWORK_SERIALIZATIONMETHODS_H diff --git a/Framework/Core/src/DataRefUtils.cxx b/Framework/Core/src/DataRefUtils.cxx new file mode 100644 index 0000000000000..e25311942ae14 --- /dev/null +++ b/Framework/Core/src/DataRefUtils.cxx @@ -0,0 +1,43 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include +#include "CCDB/CcdbApi.h" +#include "Framework/DataRefUtils.h" +#include +#include +#include "Framework/RuntimeError.h" + +namespace o2::framework +{ +// Adapted from CcdbApi private method interpretAsTMemFileAndExtract +// If the former is moved to public, throws on error and could be changed to +// not require a mutex we could use it. +void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo) +{ + void* result = nullptr; + Int_t previousErrorLevel = gErrorIgnoreLevel; + gErrorIgnoreLevel = kFatal; + auto* dh = o2::header::get(ref.header); + TMemFile memFile("name", const_cast(ref.payload), dh->payloadSize, "READ"); + gErrorIgnoreLevel = previousErrorLevel; + if (memFile.IsZombie()) { + return nullptr; + } + TClass* tcl = TClass::GetClass(tinfo); + result = ccdb::CcdbApi::extractFromTFile(memFile, tcl); + if (!result) { + throw runtime_error_f("Couldn't retrieve object corresponding to %s from TFile", tcl->GetName()); + } + memFile.Close(); + return result; +} +} // namespace o2::framework diff --git a/Framework/Core/src/LifetimeHelpers.cxx b/Framework/Core/src/LifetimeHelpers.cxx index 6e725d4925265..3d9c4d11990de 100644 --- a/Framework/Core/src/LifetimeHelpers.cxx +++ b/Framework/Core/src/LifetimeHelpers.cxx @@ -161,7 +161,7 @@ size_t readToMessage(void* p, size_t size, size_t nmemb, void* userdata) o2::vector* buffer = (o2::vector*)userdata; size_t oldSize = buffer->size(); buffer->resize(oldSize + nmemb * size); - memcpy(buffer->data() + oldSize, userdata, nmemb * size); + memcpy(buffer->data() + oldSize, p, nmemb * size); return size * nmemb; } @@ -199,9 +199,8 @@ ExpirationHandler::Handler auto& rawDeviceService = services.get(); auto&& transport = rawDeviceService.device()->GetChannel(sourceChannel, 0).Transport(); auto channelAlloc = o2::pmr::getTransportAllocator(transport); - o2::vector payloadBuffer; + o2::vector payloadBuffer{transport->GetMemoryResource()}; payloadBuffer.reserve(10000); // we begin with messages of 10KB - auto payload = o2::pmr::getMessage(std::forward>(payloadBuffer), transport->GetMemoryResource()); CURL* curl = curl_easy_init(); if (curl == nullptr) { @@ -251,10 +250,11 @@ ExpirationHandler::Handler double dl; res = curl_easy_getinfo(curl, CURLINFO_SIZE_DOWNLOAD, &dl); dh.payloadSize = payloadBuffer.size(); - dh.payloadSerializationMethod = gSerializationMethodNone; + dh.payloadSerializationMethod = gSerializationMethodCCDB; DataProcessingHeader dph{timestamp, 1}; auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + auto payload = o2::pmr::getMessage(std::forward>(payloadBuffer), transport->GetMemoryResource()); ref.header = std::move(header); ref.payload = std::move(payload); diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index 77895556881ea..7ad0b3e349cc3 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -93,6 +93,12 @@ o2_add_dpl_workflow(tof-dummy-ccdb PUBLIC_LINK_LIBRARIES O2::TOFReconstruction COMPONENT_NAME TestWorkflows) +# Detector specific dummy workflows +o2_add_dpl_workflow(test-ccdb-fetcher + SOURCES src/test_CCDBFetcher.cxx + PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF O2::Framework + COMPONENT_NAME TestWorkflows) + if(BUILD_SIMULATION) o2_add_executable( ITSClusterizers diff --git a/Framework/TestWorkflows/src/test_CCDBFetcher.cxx b/Framework/TestWorkflows/src/test_CCDBFetcher.cxx new file mode 100644 index 0000000000000..817b901dfa9cd --- /dev/null +++ b/Framework/TestWorkflows/src/test_CCDBFetcher.cxx @@ -0,0 +1,48 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/runDataProcessing.h" +#include "Framework/ServiceRegistry.h" +#include "Framework/ControlService.h" +#include "Framework/CCDBParamSpec.h" +#include "DataFormatsTOF/CalibLHCphaseTOF.h" + +#include +#include + +using namespace o2::framework; +using namespace o2::header; + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessing(ConfigContext const&) +{ + return WorkflowSpec{ + { + "A", + {InputSpec{"somecondition", "TOF", "LHCphase", 0, Lifetime::Condition, {ccdbParamSpec("TOF/LHCphase")}}, + InputSpec{"sometimer", "TST", "BAR", 0, Lifetime::Timer, {startTimeParamSpec(1638548475371)}}}, + {OutputSpec{"TST", "A1", 0, Lifetime::Timeframe}}, + AlgorithmSpec{ + adaptStateless([](DataAllocator& outputs, InputRecord& inputs, ControlService& control) { + auto ref = inputs.get("somecondition"); + auto header = o2::header::get(ref.header); + if (header->payloadSize != 2048) { + LOGP(ERROR, "Wrong size for condition payload (expected {}, found {}", 2048, header->payloadSize); + } + auto condition = inputs.get("somecondition"); + for (size_t pi = 0; pi < condition->size(); pi++) { + LOGP(INFO, "Phase at {} for timestamp {} is {}", pi, condition->timestamp(pi), condition->LHCphase(pi)); + } + control.readyToQuit(QuitRequest::All); + })}, + Options{ + {"test-option", VariantType::String, "test", {"A test option"}}}, + }}; +}