diff --git a/CMakeLists.txt b/CMakeLists.txt index 6706cf8f..8d5de387 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -180,4 +180,9 @@ install( FILES readout.cfg DESTINATION ${CMAKE_INSTALL_PREFIX}/etc PERMISSIONS OWNER_READ OWNER_WRITE GROUP_READ WORLD_READ) + +install( + FILES src/RAWDataHeader.h + DESTINATION ${CMAKE_INSTALL_PREFIX}/include/${MODULE_NAME} + PERMISSIONS OWNER_READ OWNER_WRITE GROUP_READ WORLD_READ) diff --git a/cmake/ReadoutDependencies.cmake b/cmake/ReadoutDependencies.cmake index 8d03dce7..a61961a5 100644 --- a/cmake/ReadoutDependencies.cmake +++ b/cmake/ReadoutDependencies.cmake @@ -1,4 +1,4 @@ -find_package(Boost COMPONENTS unit_test_framework program_options system thread system timer program_options random filesystem regex signals REQUIRED) +find_package(Boost COMPONENTS container unit_test_framework program_options system thread system timer program_options random filesystem regex signals REQUIRED) find_package(Git QUIET) find_package(Monitoring REQUIRED) find_package(Configuration REQUIRED) diff --git a/src/ReadoutEquipment.cxx b/src/ReadoutEquipment.cxx index 08331ee7..9403ede6 100644 --- a/src/ReadoutEquipment.cxx +++ b/src/ReadoutEquipment.cxx @@ -232,3 +232,12 @@ Thread::CallbackResult ReadoutEquipment::threadCallback(void *arg) { } return Thread::CallbackResult::Ok; } + + +void ReadoutEquipment::setDataOn() { + isDataOn=true; +} + +void ReadoutEquipment::setDataOff() { + isDataOn=false; +} diff --git a/src/ReadoutEquipment.h b/src/ReadoutEquipment.h index c5101752..23178495 100644 --- a/src/ReadoutEquipment.h +++ b/src/ReadoutEquipment.h @@ -29,6 +29,10 @@ class ReadoutEquipment { void stop(); const std::string & getName(); + // enable / disable data production by the equipment + virtual void setDataOn(); + virtual void setDataOff(); + // protected: // todo: give direct access to output FIFO? std::shared_ptr> dataOut; @@ -44,9 +48,11 @@ class ReadoutEquipment { virtual Thread::CallbackResult prepareBlocks() {return Thread::CallbackResult::Idle;}; virtual DataBlockContainerReference getNextBlock() {return nullptr;}; - protected: - + // data enabled ? controlled by setDataOn/setDataOff + bool isDataOn=false; + + // Definition of performance counters for readout statistics. // Each counter is assigned a unique integer index (incremental, starting 0). // The last element can be used to get the number of counters defined. diff --git a/src/ReadoutEquipmentRORC.cxx b/src/ReadoutEquipmentRORC.cxx index b00c0e2e..03ae283b 100644 --- a/src/ReadoutEquipmentRORC.cxx +++ b/src/ReadoutEquipmentRORC.cxx @@ -5,7 +5,7 @@ #include #include #include -#include + #include #include @@ -27,10 +27,12 @@ class ReadoutEquipmentRORC : public ReadoutEquipment { ReadoutEquipmentRORC(ConfigFile &cfg, std::string name="rorcReadout"); ~ReadoutEquipmentRORC(); + private: Thread::CallbackResult prepareBlocks(); DataBlockContainerReference getNextBlock(); - - private: + void setDataOn(); + void setDataOff(); + Thread::CallbackResult populateFifoOut(); // the data readout loop function AliceO2::roc::ChannelFactory::DmaChannelSharedPtr channel; // channel to ROC device @@ -66,8 +68,8 @@ class ReadoutEquipmentRORC : public ReadoutEquipment { }; -std::mutex readoutEquipmentRORCLock; -bool isDriverInitialized=false; +//std::mutex readoutEquipmentRORCLock; + struct ReadoutEquipmentRORCException : virtual Exception {}; @@ -133,14 +135,6 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) : BOOST_THROW_EXCEPTION(ReadoutEquipmentRORCException() << ErrorInfo::Message("Superpage must be at least 32kB")); } - // make sure ROC driver is initialized once - readoutEquipmentRORCLock.lock(); - if (!isDriverInitialized) { - AliceO2::roc::driver::initialize(); - isDriverInitialized=true; - } - readoutEquipmentRORCLock.unlock(); - // open and configure ROC theLog.log("Opening ROC %s:%d",cardId.c_str(),cfgChannelNumber); AliceO2::roc::Parameters params; @@ -180,9 +174,6 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) : baseAddress, blockSize }); - // clear locks if necessary - params.setForcedUnlockEnabled(true); - // define link mask // this is harmless for C-RORC params.setLinkMask(AliceO2::roc::Parameters::linkMaskFromString(cfgLinkMask)); @@ -207,14 +198,6 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) : // todo: log parameters ? - // start DMA - theLog.log("Starting DMA for ROC %s:%d",cardId.c_str(),cfgChannelNumber); - channel->startDma(); - - // get FIFO depth (it should be fully empty when starting) - RocFifoSize=channel->getTransferQueueAvailable(); - theLog.log("ROC input queue size = %d pages",RocFifoSize); - if (RocFifoSize==0) {RocFifoSize=1;} // reset timeframe id currentTimeframe=0; @@ -241,10 +224,6 @@ ReadoutEquipmentRORC::ReadoutEquipmentRORC(ConfigFile &cfg, std::string name) : ReadoutEquipmentRORC::~ReadoutEquipmentRORC() { - if (isInitialized) { - channel->stopDma(); - } - if (cfgRdhCheckEnabled) { theLog.log("Equipment %s : %llu timeframes, %llu pages, RDH checks %llu ok, %llu errors",name.c_str(),statsNumberOfTimeframes,statsNumberOfPages,statsRdhCheckOk,statsRdhCheckErr); } @@ -253,6 +232,8 @@ ReadoutEquipmentRORC::~ReadoutEquipmentRORC() { Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks(){ if (!isInitialized) return Thread::CallbackResult::Error; + if (!isDataOn) return Thread::CallbackResult::Idle; + int isActive=0; // keep track of situations where the queue is completely empty @@ -274,9 +255,9 @@ Thread::CallbackResult ReadoutEquipmentRORC::prepareBlocks(){ if (newPage!=nullptr) { // todo: check page is aligned as expected AliceO2::roc::Superpage superpage; - superpage.offset=(char *)newPage-(char *)mp->getBaseBlockAddress()+pageSpaceReserved; - superpage.size=superPageSize; - superpage.userData=newPage; + superpage.setOffset((char *)newPage-(char *)mp->getBaseBlockAddress()+pageSpaceReserved); + superpage.setSize(superPageSize); + superpage.setUserData(newPage); channel->pushSuperpage(superpage); isActive=1; nPushed++; @@ -325,6 +306,10 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() { DataBlockContainerReference nextBlock=nullptr; + // ensure the initialization was fine in the main thread + if (!isInitialized) { + return nullptr; + } //channel->fillSuperpages(); // check for completed page @@ -332,9 +317,10 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() { auto superpage = channel->getSuperpage(); // this is the first superpage in FIFO ... let's check its state if (superpage.isFilled()) { std::shared_ptrd=nullptr; +// printf ("received a page with %d bytes - isFilled=%d isREady=%d\n",(int)superpage.getReceived(),(int)superpage.isFilled(),(int)superpage.isReady()); try { if (pageSpaceReserved>=sizeof(DataBlock)) { - d=mp->getNewDataBlockContainer((void *)(superpage.userData)); + d=mp->getNewDataBlockContainer((void *)(superpage.getUserData())); } else { // todo: allocate data block container elsewhere than beginning of page //d=mp->getNewDataBlockContainer(nullptr); @@ -420,6 +406,8 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() { errorDescription.clear(); } statsRdhCheckErr++; + // stop on first RDH error (should distinguich valid/invalid block length) + break; } else { statsRdhCheckOk++; @@ -462,3 +450,26 @@ DataBlockContainerReference ReadoutEquipmentRORC::getNextBlock() { std::unique_ptr getReadoutEquipmentRORC(ConfigFile &cfg, std::string cfgEntryPoint) { return std::make_unique(cfg,cfgEntryPoint); } + + +void ReadoutEquipmentRORC::setDataOn() { + if (isInitialized) { + // start DMA + theLog.log("Starting DMA for ROC %s",getName().c_str()); + channel->startDma(); + + // get FIFO depth (it should be fully empty when starting) + RocFifoSize=channel->getTransferQueueAvailable(); + theLog.log("ROC input queue size = %d pages",RocFifoSize); + if (RocFifoSize==0) {RocFifoSize=1;} + } + ReadoutEquipment::setDataOn(); +} + +void ReadoutEquipmentRORC::setDataOff() { + if (isInitialized) { + theLog.log("Stopping DMA for ROC %s",getName().c_str()); + channel->stopDma(); + } + ReadoutEquipment::setDataOff(); +} diff --git a/src/mainReadout.cxx b/src/mainReadout.cxx index 9713362c..505b72aa 100644 --- a/src/mainReadout.cxx +++ b/src/mainReadout.cxx @@ -392,6 +392,9 @@ int main(int argc, char* argv[]) readoutDevice->start(); } + for (auto && readoutDevice : readoutDevices) { + readoutDevice->setDataOn(); + } theLog.log("Running"); // reset exit timeout, if any @@ -412,11 +415,11 @@ int main(int argc, char* argv[]) if (isRunning) { if (((cfgExitTimeout>0)&&(t.isTimeout()))||(ShutdownRequest)) { isRunning=0; - theLog.log("Stopping readout"); - for (auto && readoutDevice : readoutDevices) { - readoutDevice->stop(); + theLog.log("Stopping data readout"); + for (auto && readoutDevice : readoutDevices) { + readoutDevice->setDataOff(); } - theLog.log("Readout stopped"); + t.reset(1000000); // add a delay before stopping aggregator - continune to empty FIFOs // notify consumers of imminent data flow stop @@ -452,7 +455,11 @@ int main(int argc, char* argv[]) theLog.log("Stopping callgrind instrumentation"); #endif - + for (auto && readoutDevice : readoutDevices) { + readoutDevice->stop(); + } + theLog.log("Readout stopped"); + theLog.log("Stopping aggregator"); agg->stop(); diff --git a/src/testROC.cxx b/src/testROC.cxx index 88e51810..6e2c7904 100644 --- a/src/testROC.cxx +++ b/src/testROC.cxx @@ -12,7 +12,6 @@ InfoLogger theLog; #include #include #include -#include #include #include @@ -96,7 +95,6 @@ ROCdevice::ROCdevice(std::string id) { mp->getBaseBlockAddress(), mp->getBaseBlockSize() }); - params.setForcedUnlockEnabled(true); params.setLinkMask(AliceO2::roc::Parameters::linkMaskFromString(cfgLinkMask)); @@ -153,10 +151,10 @@ int ROCdevice::doLoop() { while ((channel->getReadyQueueSize()>0)) { auto superpage = channel->getSuperpage(); // this is the first superpage in FIFO ... let's check its state if (superpage.isFilled()) { - mp->releasePage(superpage.userData); + mp->releasePage(superpage.getUserData()); channel->popSuperpage(); nPop++; - nBytes+=superpage.size; + nBytes+=superpage.getSize(); } else { break; } @@ -171,9 +169,9 @@ int ROCdevice::doLoop() { void *newPage=mp->getPage(); if (newPage!=nullptr) { AliceO2::roc::Superpage superpage; - superpage.offset=(char *)newPage-(char *)mp->getBaseBlockAddress(); - superpage.size=superPageSize; - superpage.userData=newPage; + superpage.setOffset((char *)newPage-(char *)mp->getBaseBlockAddress()); + superpage.setSize(superPageSize); + superpage.setUserData(newPage); channel->pushSuperpage(superpage); nPush++; } else { @@ -257,8 +255,6 @@ int main(int argc, char**argv) { - AliceO2::roc::driver::initialize(); - std::vector devices; for (int i=argMin;i