From 7d4da453d73f89d957f8d4488fd2abe46c92d7c1 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Tue, 22 Jun 2021 14:56:56 +0200 Subject: [PATCH 1/6] trd reading empty frames --- .../TRD/reconstruction/src/DataReader.cxx | 10 ++++- .../TRD/reconstruction/src/DataReaderTask.cxx | 39 ++++++++++++++----- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/Detectors/TRD/reconstruction/src/DataReader.cxx b/Detectors/TRD/reconstruction/src/DataReader.cxx index a83d61554018a..50df24da80e8f 100644 --- a/Detectors/TRD/reconstruction/src/DataReader.cxx +++ b/Detectors/TRD/reconstruction/src/DataReader.cxx @@ -35,6 +35,7 @@ void customize(std::vector& workflowOptions) {"trd-datareader-headerverbose", VariantType::Bool, false, {"Enable verbose header info"}}, {"trd-datareader-dataverbose", VariantType::Bool, false, {"Enable verbose data info"}}, {"trd-datareader-compresseddata", VariantType::Bool, false, {"The incoming data is compressed or not"}}, + {"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}}, {"trd-datareader-enablebyteswapdata", VariantType::Bool, false, {"byteswap the incoming data, raw data needs it and simulation does not."}}}; o2::raw::HBFUtilsInitializer::addConfigOption(options); @@ -62,7 +63,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) auto compresseddata = cfgc.options().get("trd-datareader-compresseddata"); auto headerverbose = cfgc.options().get("trd-datareader-headerverbose"); auto dataverbose = cfgc.options().get("trd-datareader-dataverbose"); - + auto askSTFDist = !cfgc.options().get("ignore-dist-stf"); std::vector outputs; outputs.emplace_back("TRD", "TRACKLETS", 0, Lifetime::Timeframe); outputs.emplace_back("TRD", "DIGITS", 0, Lifetime::Timeframe); @@ -90,9 +91,14 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) int idevice = 0; // LOG(info) << "expected incoming data definition : " << inputspec; // this is probably never going to be used but would to nice to know hence here. + auto orig = o2::header::gDataOriginTRD; + std::vector inputs{{"stf", ConcreteDataTypeMatcher{orig, "RAWDATA"}, Lifetime::Optional}}; + if (askSTFDist) { + inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe); + } workflow.emplace_back(DataProcessorSpec{ std::string("trd-datareader"), // left as a string cast incase we append stuff to the string - select(std::string("x:TRD/" + inputspec).c_str()), + inputs, //select(std::string("x:TRD/" + inputspec).c_str()), outputs, algoSpec, Options{}}); diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index 9641bee9715c3..fc158b8e9d3e3 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -15,12 +15,16 @@ #include "TRDReconstruction/DataReaderTask.h" #include "TRDReconstruction/CruRawReader.h" + #include "Framework/ControlService.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/RawDeviceService.h" #include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" +#include "Framework/InputRecordWalker.h" + #include "DataFormatsTRD/Constants.h" + #include //using namespace o2::framework; @@ -62,28 +66,45 @@ void DataReaderTask::run(ProcessingContext& pc) auto outputRoutes = pc.services().get().spec().outputs; auto fairMQChannel = outputRoutes.at(0).channel; int inputcount = 0; + std::vector dummy{InputSpec{"filter", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}, Lifetime::Timeframe}}; + // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message" + // // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow + + for (const auto& ref : InputRecordWalker(pc.inputs(), dummy)) { + const auto dh = o2::framework::DataRefUtils::getHeader(ref); + if (dh->payloadSize == 16) { + LOGP(WARNING, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", + dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); + sendData(pc); //send the empty tf data. + return; + } + LOG(info) << " matched DEADBEEF"; + } + //TODO combine the previous and subsequent loops. + int inputcounts = 0; /* loop over inputs routes */ for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) { + LOG(info) << " looping over inputs " << inputcounts; + inputcounts++; if (!iit.isValid()) { continue; } /* loop over input parts */ + int inputpartscount = 0; for (auto const& ref : iit) { - + LOG(info) << " looping over parts " << inputpartscount; + if (mVerbose) { + const auto dh = DataRefUtils::getHeader(ref); + LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", + dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); + } const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; auto payloadInSize = headerIn->payloadSize; if (!mCompressedData) { //we have raw data coming in from flp if (mVerbose) { - LOG(info) << " parsing non compressed data in the data reader task"; + LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; } - - int a = 1; - int d = 1; - // while(d==1){ - // a=sin(rand()); - // } - mReader.setDataBuffer(payloadIn); mReader.setDataBufferSize(payloadInSize); mReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); From 562dc344ead653ae5d2cc1994d8fec54fd293c48 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Thu, 24 Jun 2021 12:36:38 +0200 Subject: [PATCH 2/6] safter sending of blank frames --- .../TRDReconstruction/DataReaderTask.h | 3 ++- .../TRD/reconstruction/src/DataReader.cxx | 6 ++++- .../TRD/reconstruction/src/DataReaderTask.cxx | 26 +++++++++---------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h index 27fd10b9eb685..1cdba9fb092ae 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h @@ -38,7 +38,7 @@ class DataReaderTask : public Task DataReaderTask(bool compresseddata, bool byteswap, bool verbose, bool headerverbose, bool dataverbose) : mCompressedData(compresseddata), mByteSwap(byteswap), mVerbose(verbose), mHeaderVerbose(headerverbose), mDataVerbose(dataverbose) {} ~DataReaderTask() override = default; void init(InitContext& ic) final; - void sendData(ProcessingContext& pc); + void sendData(ProcessingContext& pc, bool blankframe = false); void run(ProcessingContext& pc) final; private: @@ -57,6 +57,7 @@ class DataReaderTask : public Task bool mHeaderVerbose{false}; // verbose output of headers bool mCompressedData{false}; // are we dealing with the compressed data from the flp (send via option) bool mByteSwap{true}; // whether we are to byteswap the incoming data, mc is not byteswapped, raw data is (too be changed in cru at some point) + o2::header::DataDescription mDataSpec; // input spec of th raw incoming data }; } // namespace o2::trd diff --git a/Detectors/TRD/reconstruction/src/DataReader.cxx b/Detectors/TRD/reconstruction/src/DataReader.cxx index 50df24da80e8f..0b63f8ce9ad05 100644 --- a/Detectors/TRD/reconstruction/src/DataReader.cxx +++ b/Detectors/TRD/reconstruction/src/DataReader.cxx @@ -92,7 +92,11 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) // LOG(info) << "expected incoming data definition : " << inputspec; // this is probably never going to be used but would to nice to know hence here. auto orig = o2::header::gDataOriginTRD; - std::vector inputs{{"stf", ConcreteDataTypeMatcher{orig, "RAWDATA"}, Lifetime::Optional}}; + auto inputs = o2::framework::select(inputspec.c_str()); + for (auto& inp : inputs) { + // take care of case where our data is not in the time frame + inp.lifetime = Lifetime::Optional; + } if (askSTFDist) { inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe); } diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index fc158b8e9d3e3..b958e7706b9e5 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -43,12 +43,14 @@ void DataReaderTask::init(InitContext& ic) ic.services().get().set(CallbackService::Id::Stop, finishFunction); } -void DataReaderTask::sendData(ProcessingContext& pc) +void DataReaderTask::sendData(ProcessingContext& pc, bool blankframe) { // mReader.getParsedObjects(mTracklets,mDigits,mTriggers); - mReader.getParsedObjects(mTracklets, mDigits, mTriggers); + if (!blankframe) { + mReader.getParsedObjects(mTracklets, mDigits, mTriggers); + } - LOG(info) << "Sending data onwards with " << mDigits.size() << " Digits and " << mTracklets.size() << " Tracklets and " << mTriggers.size() << " Triggers"; + LOG(info) << "Sending data onwards with " << mDigits.size() << " Digits and " << mTracklets.size() << " Tracklets and " << mTriggers.size() << " Triggers and blankframe:" << blankframe; pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "DIGITS", 0, Lifetime::Timeframe}, mDigits); pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe}, mTracklets); pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe}, mTriggers); @@ -65,34 +67,31 @@ void DataReaderTask::run(ProcessingContext& pc) auto device = pc.services().get().device(); auto outputRoutes = pc.services().get().spec().outputs; auto fairMQChannel = outputRoutes.at(0).channel; - int inputcount = 0; - std::vector dummy{InputSpec{"filter", ConcreteDataTypeMatcher{"FLP", "DISTSUBTIMEFRAME"}, Lifetime::Timeframe}}; + mDataSpec = o2::header::gDataDescriptionRawData; + + std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD", mDataSpec, 0xDEADBEEF}}}; // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message" // // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow for (const auto& ref : InputRecordWalker(pc.inputs(), dummy)) { const auto dh = o2::framework::DataRefUtils::getHeader(ref); - if (dh->payloadSize == 16) { + if (dh->payloadSize == 16 || dh->payloadSize == 0) { LOGP(WARNING, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); - sendData(pc); //send the empty tf data. + sendData(pc, true); //send the empty tf data. return; } LOG(info) << " matched DEADBEEF"; } //TODO combine the previous and subsequent loops. - int inputcounts = 0; /* loop over inputs routes */ for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) { - LOG(info) << " looping over inputs " << inputcounts; - inputcounts++; if (!iit.isValid()) { continue; } /* loop over input parts */ int inputpartscount = 0; for (auto const& ref : iit) { - LOG(info) << " looping over parts " << inputpartscount; if (mVerbose) { const auto dh = DataRefUtils::getHeader(ref); LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", @@ -135,10 +134,9 @@ void DataReaderTask::run(ProcessingContext& pc) mTriggers = mCompressedReader.getIR(); //get the payload of trigger and digits out. } - /* output */ - //sendData(pc); //TODO do we ever have to not post the data. i.e. can we get here mid event? I dont think so. } - sendData(pc); //TODO do we ever have to not post the data. i.e. can we get here mid event? I dont think so. + /* output */ + sendData(pc, false); //TODO do we ever have to not post the data. i.e. can we get here mid event? I dont think so. } auto dataReadTime = std::chrono::high_resolution_clock::now() - dataReadStart; From d7029de519853c963aea4a3f374673a9f7cdb6f2 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Tue, 29 Jun 2021 01:06:37 +0200 Subject: [PATCH 3/6] ensure empty frame is indeed empty --- .../TRD/include/DataFormatsTRD/EventRecord.h | 4 + DataFormats/Detectors/TRD/src/EventRecord.cxx | 55 ++++++++++- .../include/TRDReconstruction/CruRawReader.h | 2 + .../TRDReconstruction/DataReaderTask.h | 3 +- .../TRD/reconstruction/src/CruRawReader.cxx | 49 +++++----- .../TRD/reconstruction/src/DataReader.cxx | 14 +-- .../TRD/reconstruction/src/DataReaderTask.cxx | 96 ++++++++++--------- 7 files changed, 135 insertions(+), 88 deletions(-) diff --git a/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h b/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h index d31049fb307af..7ddb0c713d5fe 100644 --- a/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h +++ b/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h @@ -18,6 +18,7 @@ #include "CommonDataFormat/RangeReference.h" #include "FairLogger.h" #include "DataFormatsTRD/Tracklet64.h" +#include "Framework/ProcessingContext.h" namespace o2::trd { @@ -93,6 +94,9 @@ class EventStorage void addTracklets(InteractionRecord& ir, std::vector& tracklets); void addTracklets(InteractionRecord& ir, std::vector::iterator& start, std::vector::iterator& end); void unpackDataForSending(std::vector& triggers, std::vector& tracklets, std::vector& digits); + void sendData(o2::framework::ProcessingContext& pc); + //this could replace by keeing a running total on addition TODO + void sumTrackletsDigitsTriggers(uint64_t& tracklets, uint64_t& digits, uint64_t& triggers); int sumTracklets(); int sumDigits(); std::vector& getTracklets(InteractionRecord& ir); diff --git a/DataFormats/Detectors/TRD/src/EventRecord.cxx b/DataFormats/Detectors/TRD/src/EventRecord.cxx index 7b0e04530830f..15187b1b7ba8c 100644 --- a/DataFormats/Detectors/TRD/src/EventRecord.cxx +++ b/DataFormats/Detectors/TRD/src/EventRecord.cxx @@ -21,6 +21,18 @@ #include "DataFormatsTRD/Digit.h" #include "DataFormatsTRD/EventRecord.h" #include "DataFormatsTRD/Constants.h" + +#include "Framework/Output.h" +#include "Framework/ProcessingContext.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/RawDeviceService.h" +#include "Framework/DeviceSpec.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/InputRecordWalker.h" + +#include "DataFormatsTRD/Constants.h" + #include #include #include @@ -142,14 +154,43 @@ void EventStorage::unpackDataForSending(std::vector& triggers, st { int digitcount = 0; int trackletcount = 0; - for (auto event : mEventRecords) { + for (auto& event : mEventRecords) { + tracklets.insert(std::end(tracklets), std::begin(event.getTracklets()), std::end(event.getTracklets())); + digits.insert(std::end(digits), std::begin(event.getDigits()), std::end(event.getDigits())); + triggers.emplace_back(event.getBCData(), digitcount, event.getDigits().size(), trackletcount, event.getTracklets().size()); + digitcount += event.getDigits().size(); + trackletcount += event.getTracklets().size(); + } +} + +void EventStorage::sendData(o2::framework::ProcessingContext& pc) +{ + //at this point we know the total number of tracklets and digits and triggers. + //hence we can create the relevant objects inside the message as opposed to creating a local object and snapshotting it(copying) it + //into the message. + uint64_t trackletcount = 0; + uint64_t digitcount = 0; + uint64_t triggercount = 0; + sumTrackletsDigitsTriggers(trackletcount, digitcount, triggercount); + std::vector tracklets; + tracklets.reserve(trackletcount); + std::vector digits; + digits.reserve(digitcount); + std::vector triggers; + triggers.reserve(triggercount); + for (auto& event : mEventRecords) { tracklets.insert(std::end(tracklets), std::begin(event.getTracklets()), std::end(event.getTracklets())); digits.insert(std::end(digits), std::begin(event.getDigits()), std::end(event.getDigits())); triggers.emplace_back(event.getBCData(), digitcount, event.getDigits().size(), trackletcount, event.getTracklets().size()); digitcount += event.getDigits().size(); trackletcount += event.getTracklets().size(); } + LOG(info) << "Sending data onwards with " << digits.size() << " Digits and " << tracklets.size() << " Tracklets and " << triggers.size() << " Triggers"; + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTRD, "DIGITS", 0, o2::framework::Lifetime::Timeframe}, digits); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, o2::framework::Lifetime::Timeframe}, tracklets); + pc.outputs().snapshot(o2::framework::Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, o2::framework::Lifetime::Timeframe}, triggers); } + int EventStorage::sumTracklets() { int sum = 0; @@ -166,6 +207,18 @@ int EventStorage::sumDigits() } return sum; } +void EventStorage::sumTrackletsDigitsTriggers(uint64_t& tracklets, uint64_t& digits, uint64_t& triggers) +{ + int digitsum = 0; + int trackletsum = 0; + int triggersum = 0; + for (auto event : mEventRecords) { + digitsum += event.getDigits().size(); + trackletsum += event.getTracklets().size(); + triggersum++; + } +} + std::vector& EventStorage::getTracklets(InteractionRecord& ir) { bool found = false; diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h index 73e0b3b574413..779a200c15e4f 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h @@ -92,6 +92,8 @@ class CruRawReader std::vector& getDigits(InteractionRecord& ir) { return mEventRecords.getDigits(ir); }; // std::vector getIR() { return mEventTriggers; } void getParsedObjects(std::vector& tracklets, std::vector& cdigits, std::vector& triggers); + void getParsedObjectsandClear(std::vector& tracklets, std::vector& digits, std::vector& triggers); + void buildDPLOutputs(o2::framework::ProcessingContext& outputs); int getDigitsFound() { return mTotalDigitsFound; } int getTrackletsFound() { return mTotalTrackletsFound; } int sumTrackletsFound() { return mEventRecords.sumTracklets(); } diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h index 1cdba9fb092ae..a785d539770bf 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h @@ -57,7 +57,8 @@ class DataReaderTask : public Task bool mHeaderVerbose{false}; // verbose output of headers bool mCompressedData{false}; // are we dealing with the compressed data from the flp (send via option) bool mByteSwap{true}; // whether we are to byteswap the incoming data, mc is not byteswapped, raw data is (too be changed in cru at some point) - o2::header::DataDescription mDataSpec; // input spec of th raw incoming data + // o2::header::DataDescription mDataDesc; // Data description of the incoming data + std::string mDataDesc; }; } // namespace o2::trd diff --git a/Detectors/TRD/reconstruction/src/CruRawReader.cxx b/Detectors/TRD/reconstruction/src/CruRawReader.cxx index ffa7dbf1dd8d5..f2feffb598bbc 100644 --- a/Detectors/TRD/reconstruction/src/CruRawReader.cxx +++ b/Detectors/TRD/reconstruction/src/CruRawReader.cxx @@ -23,6 +23,14 @@ #include "TRDReconstruction/TrackletsParser.h" #include "DataFormatsTRD/Constants.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" +#include "Framework/RawDeviceService.h" +#include "Framework/DeviceSpec.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/Output.h" +#include "Framework/InputRecordWalker.h" + #include #include #include @@ -415,31 +423,20 @@ void CruRawReader::getParsedObjects(std::vector& tracklets, std::vec int digitcountsum = 0; int trackletcountsum = 0; mEventRecords.unpackDataForSending(triggers, tracklets, digits); - /*for(auto eventrecord: mEventRecords)//loop over triggers incase they have already been done. - { - int digitcount=0; - int trackletcount=0; - int start,end; - LOG(info) << __func__ << " " << tracklets.size() << " " - << cdigits.size()<< " trackletv size:"<< mEventRecords.getTracklets(ir.getBCData()); - for(auto trackletv: mEventStores.getTracklets(ir.getBCData())){ - //loop through the vector of ranges - start=trackletv.getFirstEntry(); - end= start+trackletv.getEntries(); - LOG(info) << "insert tracklets from " << start<< " " << end; - tracklets.insert(tracklets.end(),mEventTracklets.begin()+start, mEventTracklets.begin()+end); - trackletcount+=trackletv.getEntries(); - } - for(auto digitv: mEventStores.getDigits(ir.getBCData())){ - start=digitv.getFirstEntry(); - end= start+digitv.getEntries(); - LOG(info) << "insert digits from " << start<< " " << end; - cdigits.insert(cdigits.end(),mEventCompressedDigits.begin()+start , mEventCompressedDigits.begin()+end); - digitcount+=digitv.getEntries(); - } - triggers.emplace_back(ir.getBCData(),digitcountsum,digitcount,trackletcountsum,trackletcount); - digitcountsum+=digitcount; - trackletcountsum+=trackletcount; - }*/ } + +void CruRawReader::getParsedObjectsandClear(std::vector& tracklets, std::vector& digits, std::vector& triggers) +{ + getParsedObjects(tracklets, digits, triggers); + clearall(); +} + +//write the output data directly to the given DataAllocator from the datareader task. +void CruRawReader::buildDPLOutputs(o2::framework::ProcessingContext& pc) +{ + mEventRecords.sendData(pc); + // pc.outputs().snapshot(Output{o2::header::gDataOriginTRD,"STATS",0,Lifetime::Timerframe},mStats); + clearall(); // having now written the messages clear for next. +} + } // namespace o2::trd diff --git a/Detectors/TRD/reconstruction/src/DataReader.cxx b/Detectors/TRD/reconstruction/src/DataReader.cxx index 0b63f8ce9ad05..d0da3752bc792 100644 --- a/Detectors/TRD/reconstruction/src/DataReader.cxx +++ b/Detectors/TRD/reconstruction/src/DataReader.cxx @@ -76,23 +76,11 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) WorkflowSpec workflow; - /* - * This is originally replicated from TOF - We define at run time the number of devices to be attached - to the workflow and the input matching string of the device. - This is is done with a configuration string like the following - one, where the input matching for each device is provide in - comma-separated strings. For instance - */ - - // std::stringstream ssconfig(inputspec); std::string iconfig; std::string inputDescription; int idevice = 0; - // LOG(info) << "expected incoming data definition : " << inputspec; - // this is probably never going to be used but would to nice to know hence here. auto orig = o2::header::gDataOriginTRD; - auto inputs = o2::framework::select(inputspec.c_str()); + auto inputs = o2::framework::select(std::string("x:TRD/" + inputspec).c_str()); for (auto& inp : inputs) { // take care of case where our data is not in the time frame inp.lifetime = Lifetime::Optional; diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index b958e7706b9e5..9573bbf30eef9 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -47,14 +47,15 @@ void DataReaderTask::sendData(ProcessingContext& pc, bool blankframe) { // mReader.getParsedObjects(mTracklets,mDigits,mTriggers); if (!blankframe) { - mReader.getParsedObjects(mTracklets, mDigits, mTriggers); + mReader.buildDPLOutputs(pc); //getParsedObjectsandClear(mTracklets, mDigits, mTriggers); + } else { + //ensure the objects we are sending back are indeed blank. + LOG(info) << "Sending data onwards with " << mDigits.size() << " Digits and " << mTracklets.size() << " Tracklets and " << mTriggers.size() << " Triggers and blankframe:" << blankframe; + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "DIGITS", 0, Lifetime::Timeframe}, mDigits); + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe}, mTracklets); + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe}, mTriggers); + // pc.outputs().snapshot(Output{o2::header::gDataOriginTRD,"STATS",0,Lifetime::Timerframe},mStats); } - - LOG(info) << "Sending data onwards with " << mDigits.size() << " Digits and " << mTracklets.size() << " Tracklets and " << mTriggers.size() << " Triggers and blankframe:" << blankframe; - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "DIGITS", 0, Lifetime::Timeframe}, mDigits); - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe}, mTracklets); - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe}, mTriggers); - // pc.outputs().snapshot(Output{o2::header::gDataOriginTRD,"STATS",0,Lifetime::Timerframe},mStats); } void DataReaderTask::run(ProcessingContext& pc) @@ -67,16 +68,16 @@ void DataReaderTask::run(ProcessingContext& pc) auto device = pc.services().get().device(); auto outputRoutes = pc.services().get().spec().outputs; auto fairMQChannel = outputRoutes.at(0).channel; - mDataSpec = o2::header::gDataDescriptionRawData; - std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD", mDataSpec, 0xDEADBEEF}}}; - // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message" + std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD", "RAWDATA", 0xDEADBEEF}}}; + //std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD","RAWDATA"/* mDataDesc.c_str()*/, 0xDEADBEEF}}}; + // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this mecans that the "delayed message" // // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow for (const auto& ref : InputRecordWalker(pc.inputs(), dummy)) { const auto dh = o2::framework::DataRefUtils::getHeader(ref); - if (dh->payloadSize == 16 || dh->payloadSize == 0) { - LOGP(WARNING, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", + if (dh->payloadSize == 0) { //}|| dh->payloadSize==16) { + LOGP(WARNING, "Found blank input input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); sendData(pc, true); //send the empty tf data. return; @@ -94,49 +95,50 @@ void DataReaderTask::run(ProcessingContext& pc) for (auto const& ref : iit) { if (mVerbose) { const auto dh = DataRefUtils::getHeader(ref); - LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF", + LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); } const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; auto payloadInSize = headerIn->payloadSize; - if (!mCompressedData) { //we have raw data coming in from flp - if (mVerbose) { - LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; - } - mReader.setDataBuffer(payloadIn); - mReader.setDataBufferSize(payloadInSize); - mReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); - if (mVerbose) { - LOG(info) << "%%% about to run " << loopcounter << " %%%"; - } - mReader.run(); - if (mVerbose) { - LOG(info) << "%%% finished running " << loopcounter << " %%%"; - } - loopcounter++; - // mTracklets.insert(std::end(mTracklets), std::begin(mReader.getTracklets()), std::end(mReader.getTracklets())); - // mCompressedDigits.insert(std::end(mCompressedDigits), std::begin(mReader.getCompressedDigits()), std::end(mReader.getCompressedDigits())); - //mReader.clearall(); - if (mVerbose) { - LOG(info) << "from parsing received: " << mTracklets.size() << " tracklets and " << mDigits.size() << " compressed digits"; - LOG(info) << "relevant vectors to read : " << mReader.sumTrackletsFound() << " tracklets and " << mReader.sumDigitsFound() << " compressed digits"; + if (std::string(headerIn->dataDescription.str) != std::string("DISTSUBTIMEFRAMEFLP")) { + if (!mCompressedData) { //we have raw data coming in from flp + if (mVerbose) { + LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; + } + mReader.setDataBuffer(payloadIn); + mReader.setDataBufferSize(payloadInSize); + mReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); + if (mVerbose) { + LOG(info) << "%%% about to run " << loopcounter << " %%%"; + } + mReader.run(); + if (mVerbose) { + LOG(info) << "%%% finished running " << loopcounter << " %%%"; + } + loopcounter++; + // mTracklets.insert(std::end(mTracklets), std::begin(mReader.getTracklets()), std::end(mReader.getTracklets())); + // mCompressedDigits.insert(std::end(mCompressedDigits), std::begin(mReader.getCompressedDigits()), std::end(mReader.getCompressedDigits())); + //mReader.clearall(); + if (mVerbose) { + LOG(info) << "from parsing received: " << mTracklets.size() << " tracklets and " << mDigits.size() << " compressed digits"; + LOG(info) << "relevant vectors to read : " << mReader.sumTrackletsFound() << " tracklets and " << mReader.sumDigitsFound() << " compressed digits"; + } + // mTriggers = mReader.getIR(); + //get the payload of trigger and digits out. + } else { // we have compressed data coming in from flp. + mCompressedReader.setDataBuffer(payloadIn); + mCompressedReader.setDataBufferSize(payloadInSize); + mCompressedReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); + mCompressedReader.run(); + //get the payload of trigger and digits out. } - // mTriggers = mReader.getIR(); - //get the payload of trigger and digits out. - } else { // we have compressed data coming in from flp. - mCompressedReader.setDataBuffer(payloadIn); - mCompressedReader.setDataBufferSize(payloadInSize); - mCompressedReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); - mCompressedReader.run(); - mTracklets = mCompressedReader.getTracklets(); - mDigits = mCompressedReader.getDigits(); - mTriggers = mCompressedReader.getIR(); - //get the payload of trigger and digits out. + /* output */ + sendData(pc, false); //TODO do we ever have to not post the data. i.e. can we get here mid event? I dont think so. + } else { + sendData(pc, true); } } - /* output */ - sendData(pc, false); //TODO do we ever have to not post the data. i.e. can we get here mid event? I dont think so. } auto dataReadTime = std::chrono::high_resolution_clock::now() - dataReadStart; From 57316accfafc3625a7d82aad298957333a1065a5 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Wed, 30 Jun 2021 10:30:36 +0200 Subject: [PATCH 4/6] flexible data description --- .../TRDReconstruction/DataReaderTask.h | 1 + .../TRD/reconstruction/src/DataReaderTask.cxx | 29 +++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h index a785d539770bf..1d5d0f664ecef 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h @@ -59,6 +59,7 @@ class DataReaderTask : public Task bool mByteSwap{true}; // whether we are to byteswap the incoming data, mc is not byteswapped, raw data is (too be changed in cru at some point) // o2::header::DataDescription mDataDesc; // Data description of the incoming data std::string mDataDesc; + o2::header::DataDescription mUserDataDescription = o2::header::gDataDescriptionInvalid; // alternative user-provided description to pick }; } // namespace o2::trd diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index 9573bbf30eef9..72d5416fd4af3 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -41,6 +41,11 @@ void DataReaderTask::init(InitContext& ic) }; ic.services().get().set(CallbackService::Id::Stop, finishFunction); + if (ic.options().isSet("trd-datareader-inputspec")) { + mDataDesc = ic.options().get("trd-datareader-inputspec"); + } else { + mDataDesc = "RAWDATA"; + } } void DataReaderTask::sendData(ProcessingContext& pc, bool blankframe) @@ -68,11 +73,11 @@ void DataReaderTask::run(ProcessingContext& pc) auto device = pc.services().get().device(); auto outputRoutes = pc.services().get().spec().outputs; auto fairMQChannel = outputRoutes.at(0).channel; - - std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD", "RAWDATA", 0xDEADBEEF}}}; - //std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{"TRD","RAWDATA"/* mDataDesc.c_str()*/, 0xDEADBEEF}}}; + //auto datadesc = (mUserDataDescription == o2::header::gDataDescriptionInvalid) ? o2::header::gDataDescriptionRawData : mUserDataDescription; + auto datadesc = (mDataDesc == o2::header::gDataDescriptionInvalid.str) ? o2::header::gDataDescriptionRawData : mUserDataDescription; + std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{o2::header::gDataOriginTRD, "RAWDATA", 0xDEADBEEF}}}; // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this mecans that the "delayed message" - // // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow + // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow for (const auto& ref : InputRecordWalker(pc.inputs(), dummy)) { const auto dh = o2::framework::DataRefUtils::getHeader(ref); @@ -93,19 +98,19 @@ void DataReaderTask::run(ProcessingContext& pc) /* loop over input parts */ int inputpartscount = 0; for (auto const& ref : iit) { - if (mVerbose) { - const auto dh = DataRefUtils::getHeader(ref); - LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", - dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); - } + // if (mVerbose) { + const auto dh = DataRefUtils::getHeader(ref); + LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", + dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); + // } const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; auto payloadInSize = headerIn->payloadSize; if (std::string(headerIn->dataDescription.str) != std::string("DISTSUBTIMEFRAMEFLP")) { if (!mCompressedData) { //we have raw data coming in from flp - if (mVerbose) { - LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; - } + // if (mVerbose) { + LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; + // } mReader.setDataBuffer(payloadIn); mReader.setDataBufferSize(payloadInSize); mReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); From 46c9e42008c6ee0b7d8a25755e4f5584cf2b1c98 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Wed, 30 Jun 2021 16:17:57 +0200 Subject: [PATCH 5/6] move TRD eventrecord class to reconstruction dir --- DataFormats/Detectors/TRD/CMakeLists.txt | 1 - Detectors/TRD/reconstruction/CMakeLists.txt | 1 + .../include/TRDReconstruction/CruRawReader.h | 2 +- .../include/TRDReconstruction}/EventRecord.h | 9 ++++--- .../TRD/reconstruction/src/CruRawReader.cxx | 2 +- .../TRD/reconstruction/src/DataReader.cxx | 5 +--- .../TRD/reconstruction/src/DataReaderTask.cxx | 26 +++++++------------ .../TRD/reconstruction}/src/EventRecord.cxx | 9 ++----- 8 files changed, 22 insertions(+), 33 deletions(-) rename {DataFormats/Detectors/TRD/include/DataFormatsTRD => Detectors/TRD/reconstruction/include/TRDReconstruction}/EventRecord.h (94%) rename {DataFormats/Detectors/TRD => Detectors/TRD/reconstruction}/src/EventRecord.cxx (91%) diff --git a/DataFormats/Detectors/TRD/CMakeLists.txt b/DataFormats/Detectors/TRD/CMakeLists.txt index 8a29c4e73e0a7..15151e9542839 100644 --- a/DataFormats/Detectors/TRD/CMakeLists.txt +++ b/DataFormats/Detectors/TRD/CMakeLists.txt @@ -18,7 +18,6 @@ o2_add_library(DataFormatsTRD src/CompressedDigit.cxx src/CTF.cxx src/Digit.cxx - src/EventRecord.cxx PUBLIC_LINK_LIBRARIES O2::CommonDataFormat O2::SimulationDataFormat) o2_target_root_dictionary(DataFormatsTRD diff --git a/Detectors/TRD/reconstruction/CMakeLists.txt b/Detectors/TRD/reconstruction/CMakeLists.txt index 9d6c550590c37..e93ec61dfccde 100644 --- a/Detectors/TRD/reconstruction/CMakeLists.txt +++ b/Detectors/TRD/reconstruction/CMakeLists.txt @@ -19,6 +19,7 @@ o2_add_library(TRDReconstruction src/CompressedRawReader.cxx src/DataReaderTask.cxx src/CruCompressorTask.cxx + src/EventRecord.cxx PUBLIC_LINK_LIBRARIES O2::TRDBase O2::DataFormatsTRD O2::DataFormatsTPC diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h index 779a200c15e4f..672ce436ae583 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/CruRawReader.h @@ -33,7 +33,7 @@ #include "DataFormatsTRD/Constants.h" #include "TRDBase/Digit.h" #include "CommonDataFormat/InteractionRecord.h" -#include "DataFormatsTRD/EventRecord.h" +#include "TRDReconstruction/EventRecord.h" namespace o2::trd { diff --git a/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h similarity index 94% rename from DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h rename to Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h index 7ddb0c713d5fe..0860d7cd8337f 100644 --- a/DataFormats/Detectors/TRD/include/DataFormatsTRD/EventRecord.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h @@ -18,7 +18,11 @@ #include "CommonDataFormat/RangeReference.h" #include "FairLogger.h" #include "DataFormatsTRD/Tracklet64.h" -#include "Framework/ProcessingContext.h" + +namespace o2::framework +{ +class ProcessingContext; +} namespace o2::trd { @@ -93,7 +97,7 @@ class EventStorage void addTracklet(InteractionRecord& ir, Tracklet64& tracklet); void addTracklets(InteractionRecord& ir, std::vector& tracklets); void addTracklets(InteractionRecord& ir, std::vector::iterator& start, std::vector::iterator& end); - void unpackDataForSending(std::vector& triggers, std::vector& tracklets, std::vector& digits); + void unpackData(std::vector& triggers, std::vector& tracklets, std::vector& digits); void sendData(o2::framework::ProcessingContext& pc); //this could replace by keeing a running total on addition TODO void sumTrackletsDigitsTriggers(uint64_t& tracklets, uint64_t& digits, uint64_t& triggers); @@ -108,7 +112,6 @@ class EventStorage //these 2 are hacks to be able to send bak a blank vector if interaction record is not found. std::vector mDummyTracklets; std::vector mDummyDigits; - ClassDefNV(EventStorage, 1); }; std::ostream& operator<<(std::ostream& stream, const EventRecord& trg); diff --git a/Detectors/TRD/reconstruction/src/CruRawReader.cxx b/Detectors/TRD/reconstruction/src/CruRawReader.cxx index f2feffb598bbc..a3db93731e15f 100644 --- a/Detectors/TRD/reconstruction/src/CruRawReader.cxx +++ b/Detectors/TRD/reconstruction/src/CruRawReader.cxx @@ -422,7 +422,7 @@ void CruRawReader::getParsedObjects(std::vector& tracklets, std::vec { int digitcountsum = 0; int trackletcountsum = 0; - mEventRecords.unpackDataForSending(triggers, tracklets, digits); + mEventRecords.unpackData(triggers, tracklets, digits); } void CruRawReader::getParsedObjectsandClear(std::vector& tracklets, std::vector& digits, std::vector& triggers) diff --git a/Detectors/TRD/reconstruction/src/DataReader.cxx b/Detectors/TRD/reconstruction/src/DataReader.cxx index d0da3752bc792..14cfa6a637976 100644 --- a/Detectors/TRD/reconstruction/src/DataReader.cxx +++ b/Detectors/TRD/reconstruction/src/DataReader.cxx @@ -29,7 +29,6 @@ void customize(std::vector& workflowOptions) { std::vector options{ - {"trd-datareader-inputspec", VariantType::String, "RAWDATA", {"TRD raw data spec"}}, {"trd-datareader-output-desc", VariantType::String, "TRDTLT", {"Output specs description string"}}, {"trd-datareader-verbose", VariantType::Bool, false, {"Enable verbose epn data reading"}}, {"trd-datareader-headerverbose", VariantType::Bool, false, {"Enable verbose header info"}}, @@ -56,7 +55,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) // o2::conf::ConfigurableParam::updateFromString(cfgc.options().get("configKeyValues")); // o2::conf::ConfigurableParam::writeINI("o2trdrawreader-workflow_configuration.ini"); - auto inputspec = cfgc.options().get("trd-datareader-inputspec"); //auto outputspec = cfgc.options().get("trd-datareader-outputspec"); auto verbose = cfgc.options().get("trd-datareader-verbose"); auto byteswap = cfgc.options().get("trd-datareader-enablebyteswapdata"); @@ -69,7 +67,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) outputs.emplace_back("TRD", "DIGITS", 0, Lifetime::Timeframe); outputs.emplace_back("TRD", "TRKTRGRD", 0, Lifetime::Timeframe); //outputs.emplace_back("TRD", "FLPSTAT", 0, Lifetime::Timeframe); - LOG(info) << "input spec is:" << inputspec; LOG(info) << "enablebyteswap :" << byteswap; AlgorithmSpec algoSpec; algoSpec = AlgorithmSpec{adaptFromTask(compresseddata, byteswap, verbose, headerverbose, dataverbose)}; @@ -80,7 +77,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) std::string inputDescription; int idevice = 0; auto orig = o2::header::gDataOriginTRD; - auto inputs = o2::framework::select(std::string("x:TRD/" + inputspec).c_str()); + auto inputs = o2::framework::select(std::string("x:TRD/RAWDATA").c_str()); for (auto& inp : inputs) { // take care of case where our data is not in the time frame inp.lifetime = Lifetime::Optional; diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index 72d5416fd4af3..912063fd70d7d 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -41,11 +41,7 @@ void DataReaderTask::init(InitContext& ic) }; ic.services().get().set(CallbackService::Id::Stop, finishFunction); - if (ic.options().isSet("trd-datareader-inputspec")) { - mDataDesc = ic.options().get("trd-datareader-inputspec"); - } else { - mDataDesc = "RAWDATA"; - } + mDataDesc = "RAWDATA"; } void DataReaderTask::sendData(ProcessingContext& pc, bool blankframe) @@ -73,9 +69,7 @@ void DataReaderTask::run(ProcessingContext& pc) auto device = pc.services().get().device(); auto outputRoutes = pc.services().get().spec().outputs; auto fairMQChannel = outputRoutes.at(0).channel; - //auto datadesc = (mUserDataDescription == o2::header::gDataDescriptionInvalid) ? o2::header::gDataDescriptionRawData : mUserDataDescription; - auto datadesc = (mDataDesc == o2::header::gDataDescriptionInvalid.str) ? o2::header::gDataDescriptionRawData : mUserDataDescription; - std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{o2::header::gDataOriginTRD, "RAWDATA", 0xDEADBEEF}}}; + std::vector dummy{InputSpec{"dummy", ConcreteDataMatcher{o2::header::gDataOriginTRD, o2::header::gDataDescriptionRawData, 0xDEADBEEF}}}; // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this mecans that the "delayed message" // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow @@ -98,19 +92,19 @@ void DataReaderTask::run(ProcessingContext& pc) /* loop over input parts */ int inputpartscount = 0; for (auto const& ref : iit) { - // if (mVerbose) { - const auto dh = DataRefUtils::getHeader(ref); - LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", - dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); - // } + if (mVerbose) { + const auto dh = DataRefUtils::getHeader(ref); + LOGP(info, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : ", + dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, dh->payloadSize); + } const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; auto payloadInSize = headerIn->payloadSize; if (std::string(headerIn->dataDescription.str) != std::string("DISTSUBTIMEFRAMEFLP")) { if (!mCompressedData) { //we have raw data coming in from flp - // if (mVerbose) { - LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; - // } + if (mVerbose) { + LOG(info) << " parsing non compressed data in the data reader task with a payload of " << payloadInSize << " payload size"; + } mReader.setDataBuffer(payloadIn); mReader.setDataBufferSize(payloadInSize); mReader.configure(mByteSwap, mVerbose, mHeaderVerbose, mDataVerbose); diff --git a/DataFormats/Detectors/TRD/src/EventRecord.cxx b/Detectors/TRD/reconstruction/src/EventRecord.cxx similarity index 91% rename from DataFormats/Detectors/TRD/src/EventRecord.cxx rename to Detectors/TRD/reconstruction/src/EventRecord.cxx index 15187b1b7ba8c..199d48e1ff124 100644 --- a/DataFormats/Detectors/TRD/src/EventRecord.cxx +++ b/Detectors/TRD/reconstruction/src/EventRecord.cxx @@ -19,7 +19,7 @@ #include "DataFormatsTRD/TriggerRecord.h" #include "DataFormatsTRD/Tracklet64.h" #include "DataFormatsTRD/Digit.h" -#include "DataFormatsTRD/EventRecord.h" +#include "TRDReconstruction/EventRecord.h" #include "DataFormatsTRD/Constants.h" #include "Framework/Output.h" @@ -121,7 +121,6 @@ void EventStorage::addTracklets(InteractionRecord& ir, std::vector& if (ir == mEventRecords[count].getBCData()) { //TODO replace this with a hash/map not a vector mEventRecords[count].addTracklets(tracklets); //mTracklets.insert(mTracklets.back(),start,end); - // LOG(info) << "adding " << tracklets.size() << " tracklets and tracklet sum: " << sumTracklets() << " IR count : "<< mEventRecords.size();; added = true; } } @@ -129,7 +128,6 @@ void EventStorage::addTracklets(InteractionRecord& ir, std::vector& // unseen ir so add it mEventRecords.push_back(ir); mEventRecords.back().addTracklets(tracklets); - // hLOG(info) << "unknown ir adding " << tracklets.size() << " tracklets and sum of : "<< sumTracklets() << " IR count : "<< mEventRecords.size(); } } void EventStorage::addTracklets(InteractionRecord& ir, std::vector::iterator& start, std::vector::iterator& end) @@ -139,7 +137,6 @@ void EventStorage::addTracklets(InteractionRecord& ir, std::vector:: if (ir == mEventRecords[count].getBCData()) { //TODO replace this with a hash/map not a vector mEventRecords[count].addTracklets(start, end); //mTracklets.insert(mTracklets.back(),start,end); - // LOG(info) << "x iknown ir adding " << std::distance(start,end)<< " tracklets"; added = true; } } @@ -150,7 +147,7 @@ void EventStorage::addTracklets(InteractionRecord& ir, std::vector:: // LOG(info) << "x unknown ir adding " << std::distance(start,end)<< " tracklets"; } } -void EventStorage::unpackDataForSending(std::vector& triggers, std::vector& tracklets, std::vector& digits) +void EventStorage::unpackData(std::vector& triggers, std::vector& tracklets, std::vector& digits) { int digitcount = 0; int trackletcount = 0; @@ -166,8 +163,6 @@ void EventStorage::unpackDataForSending(std::vector& triggers, st void EventStorage::sendData(o2::framework::ProcessingContext& pc) { //at this point we know the total number of tracklets and digits and triggers. - //hence we can create the relevant objects inside the message as opposed to creating a local object and snapshotting it(copying) it - //into the message. uint64_t trackletcount = 0; uint64_t digitcount = 0; uint64_t triggercount = 0; From 665b754dbc6492b67571aa692e3f93ff214ec7b6 Mon Sep 17 00:00:00 2001 From: Sean Murray Date: Thu, 1 Jul 2021 14:42:54 +0200 Subject: [PATCH 6/6] remove classdef for EventRecord, and local variables for emptry frames data --- .../include/TRDReconstruction/DataReaderTask.h | 4 ---- .../include/TRDReconstruction/EventRecord.h | 2 -- .../TRD/reconstruction/src/DataReaderTask.cxx | 16 ++++++++-------- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h index 1d5d0f664ecef..e2fd3c2d9e75c 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/DataReaderTask.h @@ -47,10 +47,6 @@ class DataReaderTask : public Task // in both cases we pull the data from the vectors build message and pass on. // they will internally produce a vector of digits and a vector tracklets and associated indexing. // TODO templatise this and 2 versions of datareadertask, instantiated with the relevant parser. - std::vector mTracklets; - std::vector mDigits; - std::vector mTriggers; - // std::vector mStats; bool mVerbose{false}; // verbos output general debuggign and info output. bool mDataVerbose{false}; // verbose output of data unpacking diff --git a/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h b/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h index 0860d7cd8337f..fcfe5b70e40e4 100644 --- a/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h +++ b/Detectors/TRD/reconstruction/include/TRDReconstruction/EventRecord.h @@ -79,8 +79,6 @@ class EventRecord BCData mBCData; /// orbit and Bunch crossing data of the physics trigger std::vector mDigits{}; /// digit data, for this event std::vector mTracklets{}; /// tracklet data, for this event - - ClassDefNV(EventRecord, 1); }; class EventStorage diff --git a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx index 912063fd70d7d..4ae1b256fe2e2 100644 --- a/Detectors/TRD/reconstruction/src/DataReaderTask.cxx +++ b/Detectors/TRD/reconstruction/src/DataReaderTask.cxx @@ -51,11 +51,14 @@ void DataReaderTask::sendData(ProcessingContext& pc, bool blankframe) mReader.buildDPLOutputs(pc); //getParsedObjectsandClear(mTracklets, mDigits, mTriggers); } else { //ensure the objects we are sending back are indeed blank. - LOG(info) << "Sending data onwards with " << mDigits.size() << " Digits and " << mTracklets.size() << " Tracklets and " << mTriggers.size() << " Triggers and blankframe:" << blankframe; - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "DIGITS", 0, Lifetime::Timeframe}, mDigits); - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe}, mTracklets); - pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe}, mTriggers); - // pc.outputs().snapshot(Output{o2::header::gDataOriginTRD,"STATS",0,Lifetime::Timerframe},mStats); + //TODO maybe put this in buildDPLOutputs so sending all done in 1 place, not now though. + std::vector tracklets; + std::vector digits; + std::vector triggers; + LOG(info) << "Sending data onwards with " << digits.size() << " Digits and " << tracklets.size() << " Tracklets and " << triggers.size() << " Triggers and blankframe:" << blankframe; + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "DIGITS", 0, Lifetime::Timeframe}, digits); + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRACKLETS", 0, Lifetime::Timeframe}, tracklets); + pc.outputs().snapshot(Output{o2::header::gDataOriginTRD, "TRKTRGRD", 0, Lifetime::Timeframe}, triggers); } } @@ -120,7 +123,6 @@ void DataReaderTask::run(ProcessingContext& pc) // mCompressedDigits.insert(std::end(mCompressedDigits), std::begin(mReader.getCompressedDigits()), std::end(mReader.getCompressedDigits())); //mReader.clearall(); if (mVerbose) { - LOG(info) << "from parsing received: " << mTracklets.size() << " tracklets and " << mDigits.size() << " compressed digits"; LOG(info) << "relevant vectors to read : " << mReader.sumTrackletsFound() << " tracklets and " << mReader.sumDigitsFound() << " compressed digits"; } // mTriggers = mReader.getIR(); @@ -144,10 +146,8 @@ void DataReaderTask::run(ProcessingContext& pc) LOG(info) << "Processing time for Data reading " << std::chrono::duration_cast(dataReadTime).count() << "ms"; if (!mCompressedData) { LOG(info) << "Digits found : " << mReader.getDigitsFound(); - LOG(info) << "Digits returned : " << mDigits.size(); LOG(info) << "Tracklets found : " << mReader.getTrackletsFound(); - LOG(info) << "Tracklets returned : " << mTracklets.size(); } }