Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DataFormats/Headers/include/Headers/DataHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ constexpr o2::header::SerializationMethod gSerializationMethodAny{"*******"};
constexpr o2::header::SerializationMethod gSerializationMethodInvalid{"INVALID"};
constexpr o2::header::SerializationMethod gSerializationMethodNone{"NONE"};
constexpr o2::header::SerializationMethod gSerializationMethodROOT{"ROOT"};
constexpr o2::header::SerializationMethod gSerializationMethodCCDB{"CCDB"};
constexpr o2::header::SerializationMethod gSerializationMethodFlatBuf{"FLATBUF"};
constexpr o2::header::SerializationMethod gSerializationMethodArrow{"ARROW"};

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ o2_add_library(Framework
src/DataProcessingDevice.cxx
src/DataProcessingHeader.cxx
src/DataProcessingHelpers.cxx
src/DataRefUtils.cxx
src/SourceInfoHeader.cxx
src/DataProcessor.cxx
src/DataRelayer.cxx
Expand Down Expand Up @@ -127,6 +128,7 @@ o2_add_library(Framework
FairMQ::FairMQ
O2::CommonUtils
O2::MathUtils
O2::CCDB
O2::FrameworkFoundation
O2::Headers
O2::MemoryResources
Expand Down
8 changes: 8 additions & 0 deletions Framework/Core/include/Framework/DataRefUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gsl/gsl>

#include <type_traits>
#include <typeinfo>

namespace o2::framework
{
Expand Down Expand Up @@ -150,8 +151,15 @@ struct DataRefUtils {
}
});
return std::move(result);
} else if constexpr (is_specialization<T, CCDBSerialized>::value == true) {
using wrapped = typename T::wrapped_type;
using DataHeader = o2::header::DataHeader;
std::unique_ptr<wrapped> result(static_cast<wrapped*>(DataRefUtils::decodeCCDB(ref, typeid(wrapped))));
return std::move(result);
}
}
// Decode a CCDB object using the CcdbApi.
static void* decodeCCDB(DataRef const& ref, std::type_info const& info);

static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef& ref)
{
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,9 @@ class InputRecord
// return type with owning Deleter instance, forwarding to default_deleter
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<ROOTSerialized<ValueT>>(ref).release());
return result;
} else if (method == o2::header::gSerializationMethodCCDB) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release());
return result;
} else {
throw runtime_error("Attempt to extract object from message with unsupported serialization type");
}
Expand Down
25 changes: 25 additions & 0 deletions Framework/Core/include/Framework/SerializationMethods.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,31 @@ class BoostSerialized
private:
wrapped_type& mRef;
};

template <typename T, typename HintType = void>
class CCDBSerialized
{
public:
using non_messageable = o2::framework::MarkAsNonMessageable;
using wrapped_type = T;
using hint_type = HintType;

static_assert(std::is_pointer<T>::value == false, "wrapped type can not be a pointer");
static_assert(std::is_pointer<HintType>::value == false, "hint type can not be a pointer");

CCDBSerialized() = delete;
CCDBSerialized(wrapped_type& ref, hint_type* hint = nullptr) : mRef(ref), mHint(hint) {}

T& operator()() { return mRef; }
T const& operator()() const { return mRef; }

hint_type* getHint() const { return mHint; }

private:
wrapped_type& mRef;
hint_type* mHint; // optional hint e.g. class info or class name
};

} // namespace framework
} // namespace o2
#endif // FRAMEWORK_SERIALIZATIONMETHODS_H
43 changes: 43 additions & 0 deletions Framework/Core/src/DataRefUtils.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include <typeinfo>
#include "CCDB/CcdbApi.h"
#include "Framework/DataRefUtils.h"
#include <TMemFile.h>
#include <TError.h>
#include "Framework/RuntimeError.h"

namespace o2::framework
{
// Adapted from CcdbApi private method interpretAsTMemFileAndExtract
// If the former is moved to public, throws on error and could be changed to
// not require a mutex we could use it.
void* DataRefUtils::decodeCCDB(DataRef const& ref, std::type_info const& tinfo)
{
void* result = nullptr;
Int_t previousErrorLevel = gErrorIgnoreLevel;
gErrorIgnoreLevel = kFatal;
auto* dh = o2::header::get<o2::header::DataHeader*>(ref.header);
TMemFile memFile("name", const_cast<char*>(ref.payload), dh->payloadSize, "READ");
gErrorIgnoreLevel = previousErrorLevel;
if (memFile.IsZombie()) {
return nullptr;
}
TClass* tcl = TClass::GetClass(tinfo);
result = ccdb::CcdbApi::extractFromTFile(memFile, tcl);
if (!result) {
throw runtime_error_f("Couldn't retrieve object corresponding to %s from TFile", tcl->GetName());
}
memFile.Close();
return result;
}
} // namespace o2::framework
8 changes: 4 additions & 4 deletions Framework/Core/src/LifetimeHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ size_t readToMessage(void* p, size_t size, size_t nmemb, void* userdata)
o2::vector<char>* buffer = (o2::vector<char>*)userdata;
size_t oldSize = buffer->size();
buffer->resize(oldSize + nmemb * size);
memcpy(buffer->data() + oldSize, userdata, nmemb * size);
memcpy(buffer->data() + oldSize, p, nmemb * size);
return size * nmemb;
}

Expand Down Expand Up @@ -199,9 +199,8 @@ ExpirationHandler::Handler
auto& rawDeviceService = services.get<RawDeviceService>();
auto&& transport = rawDeviceService.device()->GetChannel(sourceChannel, 0).Transport();
auto channelAlloc = o2::pmr::getTransportAllocator(transport);
o2::vector<char> payloadBuffer;
o2::vector<char> payloadBuffer{transport->GetMemoryResource()};
payloadBuffer.reserve(10000); // we begin with messages of 10KB
auto payload = o2::pmr::getMessage(std::forward<o2::vector<char>>(payloadBuffer), transport->GetMemoryResource());

CURL* curl = curl_easy_init();
if (curl == nullptr) {
Expand Down Expand Up @@ -251,10 +250,11 @@ ExpirationHandler::Handler
double dl;
res = curl_easy_getinfo(curl, CURLINFO_SIZE_DOWNLOAD, &dl);
dh.payloadSize = payloadBuffer.size();
dh.payloadSerializationMethod = gSerializationMethodNone;
dh.payloadSerializationMethod = gSerializationMethodCCDB;

DataProcessingHeader dph{timestamp, 1};
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
auto payload = o2::pmr::getMessage(std::forward<o2::vector<char>>(payloadBuffer), transport->GetMemoryResource());

ref.header = std::move(header);
ref.payload = std::move(payload);
Expand Down
6 changes: 6 additions & 0 deletions Framework/TestWorkflows/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ o2_add_dpl_workflow(tof-dummy-ccdb
PUBLIC_LINK_LIBRARIES O2::TOFReconstruction
COMPONENT_NAME TestWorkflows)

# Detector specific dummy workflows
o2_add_dpl_workflow(test-ccdb-fetcher
SOURCES src/test_CCDBFetcher.cxx
PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF O2::Framework
COMPONENT_NAME TestWorkflows)

if(BUILD_SIMULATION)
o2_add_executable(
ITSClusterizers
Expand Down
48 changes: 48 additions & 0 deletions Framework/TestWorkflows/src/test_CCDBFetcher.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/runDataProcessing.h"
#include "Framework/ServiceRegistry.h"
#include "Framework/ControlService.h"
#include "Framework/CCDBParamSpec.h"
#include "DataFormatsTOF/CalibLHCphaseTOF.h"

#include <chrono>
#include <thread>

using namespace o2::framework;
using namespace o2::header;

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const&)
{
return WorkflowSpec{
{
"A",
{InputSpec{"somecondition", "TOF", "LHCphase", 0, Lifetime::Condition, {ccdbParamSpec("TOF/LHCphase")}},
InputSpec{"sometimer", "TST", "BAR", 0, Lifetime::Timer, {startTimeParamSpec(1638548475371)}}},
{OutputSpec{"TST", "A1", 0, Lifetime::Timeframe}},
AlgorithmSpec{
adaptStateless([](DataAllocator& outputs, InputRecord& inputs, ControlService& control) {
auto ref = inputs.get("somecondition");
auto header = o2::header::get<o2::header::DataHeader*>(ref.header);
if (header->payloadSize != 2048) {
LOGP(ERROR, "Wrong size for condition payload (expected {}, found {}", 2048, header->payloadSize);
}
auto condition = inputs.get<o2::dataformats::CalibLHCphaseTOF*>("somecondition");
for (size_t pi = 0; pi < condition->size(); pi++) {
LOGP(INFO, "Phase at {} for timestamp {} is {}", pi, condition->timestamp(pi), condition->LHCphase(pi));
}
control.readyToQuit(QuitRequest::All);
})},
Options{
{"test-option", VariantType::String, "test", {"A test option"}}},
}};
}