From 664ef5703cad3e182a2bbc64aa026e72cd9f0b09 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 11 Nov 2015 10:09:03 +0100 Subject: [PATCH 1/2] Use smart pointer methods for the flp2epn and flp2epn-distributed devices --- devices/flp2epn-distributed/EPNReceiver.cxx | 70 ++++++++----------- devices/flp2epn-distributed/EPNReceiver.h | 2 +- devices/flp2epn-distributed/FLPSender.cxx | 43 +++++------- devices/flp2epn-distributed/FLPSender.h | 7 +- .../flp2epn-distributed/FLPSyncSampler.cxx | 10 +-- devices/flp2epn/O2EPNex.cxx | 22 +++--- devices/flp2epn/O2EPNex.h | 9 +-- devices/flp2epn/O2EpnMerger.cxx | 39 +++-------- devices/flp2epn/O2EpnMerger.h | 9 +-- devices/flp2epn/O2FLPex.cxx | 67 +++++++++--------- devices/flp2epn/O2FLPex.h | 10 --- devices/flp2epn/O2Merger.cxx | 23 +++--- devices/flp2epn/O2Merger.h | 1 + devices/flp2epn/O2Proxy.cxx | 22 +++--- devices/flp2epn/O2Proxy.h | 1 + 15 files changed, 134 insertions(+), 201 deletions(-) diff --git a/devices/flp2epn-distributed/EPNReceiver.cxx b/devices/flp2epn-distributed/EPNReceiver.cxx index 9ce59e478fea6..9300487dae943 100644 --- a/devices/flp2epn-distributed/EPNReceiver.cxx +++ b/devices/flp2epn-distributed/EPNReceiver.cxx @@ -66,7 +66,7 @@ void EPNReceiver::DiscardIncompleteTimeframes() LOG(WARN) << "Timeframe #" << it->first << " incomplete after " << fBufferTimeoutInMs << " milliseconds, discarding"; fDiscardedSet.insert(it->first); for (int i = 0; i < (it->second).parts.size(); ++i) { - delete (it->second).parts.at(i); + (it->second).parts.at(i).reset(); } it->second.parts.clear(); fTimeframeBuffer.erase(it++); @@ -82,10 +82,7 @@ void EPNReceiver::Run() { // boost::thread heartbeatSender(boost::bind(&EPNReceiver::sendHeartbeats, this)); - FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels.at("data-in")); - - int SNDMORE = fChannels.at("data-in").at(0).fSocket->SNDMORE; - int NOBLOCK = fChannels.at("data-in").at(0).fSocket->NOBLOCK; + unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); // DEBUG: store receive intervals per FLP // vector> rcvIntervals(fNumFLPs, vector()); @@ -104,7 +101,7 @@ void EPNReceiver::Run() poller->Poll(100); if (poller->CheckInput(0)) { - FairMQMessage* headerPart = fTransportFactory->CreateMessage(); + unique_ptr headerPart(fTransportFactory->CreateMessage()); if (dataInputChannel.Receive(headerPart) > 0) { // store the received ID @@ -123,61 +120,55 @@ void EPNReceiver::Run() // } // end DEBUG - FairMQMessage* dataPart = fTransportFactory->CreateMessage(); - rcvDataSize = dataInputChannel.Receive(dataPart); + unique_ptr dataPart(fTransportFactory->CreateMessage()); - if (fDiscardedSet.find(id) == fDiscardedSet.end()) { - // if received ID has not previously been discarded. - if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end()) { - // if received ID is not yet in the buffer. - if (rcvDataSize > 0) { - // receive data, store it in the buffer, save the receive time. - fTimeframeBuffer[id].parts.push_back(dataPart); + // receive the data part + if (dataInputChannel.Receive(dataPart) > 0) + { + if (fDiscardedSet.find(id) == fDiscardedSet.end()) + { + if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end()) + { + // if this is the first part with this ID, save the receive time. fTimeframeBuffer[id].startTime = boost::posix_time::microsec_clock::local_time(); - } else { - LOG(ERROR) << "no data received from input socket"; - delete dataPart; - } - // PrintBuffer(fTimeframeBuffer); - } else { - // if received ID is already in the buffer - if (rcvDataSize > 0) { - fTimeframeBuffer[id].parts.push_back(dataPart); - } else { - LOG(ERROR) << "no data received from input socket"; - delete dataPart; } + // if the received ID has not previously been discarded, + // store the data part in the buffer + fTimeframeBuffer[id].parts.push_back(move(dataPart)); // PrintBuffer(fTimeframeBuffer); } - } else { - // if received ID has been previously discarded. - LOG(WARN) << "Received part from an already discarded timeframe with id " << id; - delete dataPart; + else + { + // if received ID has been previously discarded. + LOG(WARN) << "Received part from an already discarded timeframe with id " << id; + } + } + else + { + LOG(ERROR) << "no data received from input socket"; } if (fTimeframeBuffer[id].parts.size() == fNumFLPs) { // LOG(INFO) << "Collected all parts for timeframe #" << id; // when all parts are collected send all except last one with 'snd-more' flag, and last one without the flag. for (int i = 0; i < fNumFLPs - 1; ++i) { - dataOutChannel.Send(fTimeframeBuffer[id].parts.at(i), SNDMORE); + dataOutChannel.SendPart(fTimeframeBuffer[id].parts.at(i)); } dataOutChannel.Send(fTimeframeBuffer[id].parts.at(fNumFLPs - 1)); if (fTestMode > 0) { // Send an acknowledgement back to the sampler to measure the round trip time - FairMQMessage* ack = fTransportFactory->CreateMessage(sizeof(uint16_t)); + unique_ptr ack(fTransportFactory->CreateMessage(sizeof(uint16_t))); memcpy(ack->GetData(), &id, sizeof(uint16_t)); - if (ackOutChannel.Send(ack, NOBLOCK) <= 0) { + if (ackOutChannel.SendAsync(ack) <= 0) { LOG(ERROR) << "Could not send acknowledgement without blocking"; } - - delete ack; } // let transport know that the data is no longer needed. transport will clean up after it is sent out. for (int i = 0; i < fTimeframeBuffer[id].parts.size(); ++i) { - delete fTimeframeBuffer[id].parts.at(i); + fTimeframeBuffer[id].parts.at(i).reset(); } fTimeframeBuffer[id].parts.clear(); @@ -188,7 +179,6 @@ void EPNReceiver::Run() // LOG(WARN) << "Buffer size: " << fTimeframeBuffer.size(); } - delete headerPart; } // check if any incomplete timeframes in the buffer are older than timeout period, and discard them if they are @@ -220,12 +210,10 @@ void EPNReceiver::sendHeartbeats() while (CheckCurrentState(RUNNING)) { try { for (int i = 0; i < fNumFLPs; ++i) { - FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(ownAddressSize); + unique_ptr heartbeatMsg(fTransportFactory->CreateMessage(ownAddressSize)); memcpy(heartbeatMsg->GetData(), ownAddress.c_str(), ownAddressSize); fChannels.at("heartbeat-out").at(i).Send(heartbeatMsg); - - delete heartbeatMsg; } boost::this_thread::sleep(boost::posix_time::milliseconds(fHeartbeatIntervalInMs)); } catch (boost::thread_interrupted&) { diff --git a/devices/flp2epn-distributed/EPNReceiver.h b/devices/flp2epn-distributed/EPNReceiver.h index 97315be1bf23e..7236a2eea5eac 100644 --- a/devices/flp2epn-distributed/EPNReceiver.h +++ b/devices/flp2epn-distributed/EPNReceiver.h @@ -23,7 +23,7 @@ namespace Devices { struct TFBuffer { - std::vector parts; + std::vector> parts; boost::posix_time::ptime startTime; boost::posix_time::ptime endTime; }; diff --git a/devices/flp2epn-distributed/FLPSender.cxx b/devices/flp2epn-distributed/FLPSender.cxx index 6bc28ddcd3498..8b0f4d93c17ce 100644 --- a/devices/flp2epn-distributed/FLPSender.cxx +++ b/devices/flp2epn-distributed/FLPSender.cxx @@ -34,8 +34,6 @@ FLPSender::FLPSender() , fHeaderBuffer() , fDataBuffer() , fArrivalTime() - , fSndMoreFlag(0) - , fNoBlockFlag(0) , fNumEPNs(0) , fHeartbeatTimeoutInMs(20000) , fHeartbeats() @@ -66,7 +64,7 @@ void FLPSender::receiveHeartbeats() while (CheckCurrentState(RUNNING)) { try { - FairMQMessage* hbMsg = fTransportFactory->CreateMessage(); + unique_ptr hbMsg(fTransportFactory->CreateMessage()); if (hbChannel.Receive(hbMsg) > 0) { string address = string(static_cast(hbMsg->GetData()), hbMsg->GetSize()); @@ -87,8 +85,6 @@ void FLPSender::receiveHeartbeats() LOG(ERROR) << "IP " << address << " unknown, not provided at execution time"; } } - - delete hbMsg; } catch (boost::thread_interrupted&) { LOG(INFO) << "FLPSender::receiveHeartbeats() interrupted"; break; @@ -100,15 +96,13 @@ void FLPSender::Run() { // boost::thread heartbeatReceiver(boost::bind(&FLPSender::receiveHeartbeats, this)); - // base buffer, to be copied from for every timeframe body + // base buffer, to be copied from for every timeframe body (zero-copy) void* buffer = operator new[](fEventSize); - FairMQMessage* baseMsg = fTransportFactory->CreateMessage(buffer, fEventSize); - - fSndMoreFlag = fChannels.at("data-in").at(0).fSocket->SNDMORE; - fNoBlockFlag = fChannels.at("data-in").at(0).fSocket->NOBLOCK; + unique_ptr baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize)); uint16_t timeFrameId = 0; + // store the channel reference to avoid traversing the map on every loop iteration FairMQChannel& dataInChannel = fChannels.at("data-in").at(0); while (CheckCurrentState(RUNNING)) { @@ -117,7 +111,7 @@ void FLPSender::Run() if (fTestMode > 0) { // test-mode: receive and store id part in the buffer. - FairMQMessage* idPart = fTransportFactory->CreateMessage(); + unique_ptr idPart(fTransportFactory->CreateMessage()); if (dataInChannel.Receive(idPart) > 0) { h->timeFrameId = *(static_cast(idPart->GetData())); h->flpIndex = fIndex; @@ -126,8 +120,6 @@ void FLPSender::Run() delete h; continue; } - - delete idPart; } else { // regular mode: use the id generated locally h->timeFrameId = timeFrameId; @@ -138,8 +130,8 @@ void FLPSender::Run() } } - FairMQMessage* headerPart = fTransportFactory->CreateMessage(h, sizeof(f2eHeader)); - FairMQMessage* dataPart = fTransportFactory->CreateMessage(); + unique_ptr headerPart(fTransportFactory->CreateMessage(h, sizeof(f2eHeader))); + unique_ptr dataPart(fTransportFactory->CreateMessage()); // save the arrival time of the message. fArrivalTime.push(boost::posix_time::microsec_clock::local_time()); @@ -147,13 +139,13 @@ void FLPSender::Run() if (fTestMode > 0) { // test-mode: initialize and store data part in the buffer. dataPart->Copy(baseMsg); - fHeaderBuffer.push(headerPart); - fDataBuffer.push(dataPart); + fHeaderBuffer.push(move(headerPart)); + fDataBuffer.push(move(dataPart)); } else { // regular mode: receive data part from input if (dataInChannel.Receive(dataPart) >= 0) { - fHeaderBuffer.push(headerPart); - fDataBuffer.push(dataPart); + fHeaderBuffer.push(move(headerPart)); + fDataBuffer.push(move(dataPart)); } else { // if nothing was received, try again continue; @@ -176,8 +168,6 @@ void FLPSender::Run() } } - delete baseMsg; - // heartbeatReceiver.interrupt(); // heartbeatReceiver.join(); } @@ -207,11 +197,12 @@ inline void FLPSender::sendFrontData() // fArrivalTime.pop(); // fDataBuffer.pop(); // } else { // if the heartbeat from the corresponding EPN is within timeout period, send the data. - if (fChannels.at("data-out").at(direction).Send(fHeaderBuffer.front(), fSndMoreFlag|fNoBlockFlag) <= 0) { - LOG(ERROR) << "Could not queue ID part of event #" << currentTimeframeId << " without blocking"; - } - if (fChannels.at("data-out").at(direction).Send(fDataBuffer.front(), fNoBlockFlag) <= 0) { - LOG(ERROR) << "Could not send message with event #" << currentTimeframeId << " without blocking"; + if (fChannels.at("data-out").at(direction).SendPartAsync(fHeaderBuffer.front()) < 0) { + LOG(ERROR) << "Failed to queue ID part of event #" << currentTimeframeId; + } else { + if (fChannels.at("data-out").at(direction).SendAsync(fDataBuffer.front()) < 0) { + LOG(ERROR) << "Could not send message with event #" << currentTimeframeId << " without blocking"; + } } fHeaderBuffer.pop(); fArrivalTime.pop(); diff --git a/devices/flp2epn-distributed/FLPSender.h b/devices/flp2epn-distributed/FLPSender.h index 573ae5838d0be..91fe98057a35d 100644 --- a/devices/flp2epn-distributed/FLPSender.h +++ b/devices/flp2epn-distributed/FLPSender.h @@ -75,8 +75,8 @@ class FLPSender : public FairMQDevice /// Sends the "oldest" element from the sub-timeframe container void sendFrontData(); - std::queue fHeaderBuffer; ///< Stores sub-timeframe headers - std::queue fDataBuffer; ///< Stores sub-timeframe bodies + std::queue> fHeaderBuffer; ///< Stores sub-timeframe headers + std::queue> fDataBuffer; ///< Stores sub-timeframe bodies std::queue fArrivalTime; ///< Stores arrival times of sub-timeframes int fNumEPNs; ///< Number of epnReceivers @@ -84,9 +84,6 @@ class FLPSender : public FairMQDevice unsigned int fSendOffset; ///< Offset for staggering output unsigned int fSendDelay; ///< Delay for staggering output - int fSndMoreFlag; ///< Flag for faster access to multipart sending - int fNoBlockFlag; ///< Flag for faster access to sending without blocking - int fEventSize; ///< Size of the sub-timeframe body (only for test mode) int fTestMode; ///< Run the device in test mode (only syncSampler+flpSender+epnReceiver) diff --git a/devices/flp2epn-distributed/FLPSyncSampler.cxx b/devices/flp2epn-distributed/FLPSyncSampler.cxx index 4cdd7a760d28e..c13dcb2fdb6f0 100644 --- a/devices/flp2epn-distributed/FLPSyncSampler.cxx +++ b/devices/flp2epn-distributed/FLPSyncSampler.cxx @@ -42,17 +42,15 @@ void FLPSyncSampler::Run() boost::thread resetEventCounter(boost::bind(&FLPSyncSampler::ResetEventCounter, this)); boost::thread ackListener(boost::bind(&FLPSyncSampler::ListenForAcks, this)); - int NOBLOCK = fChannels.at("data-out").at(0).fSocket->NOBLOCK; - uint16_t timeFrameId = 1; FairMQChannel& dataOutputChannel = fChannels.at("data-out").at(0); while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(sizeof(uint16_t)); + unique_ptr msg(fTransportFactory->CreateMessage(sizeof(uint16_t))); memcpy(msg->GetData(), &timeFrameId, sizeof(uint16_t)); - if (dataOutputChannel.Send(msg, NOBLOCK) > 0) { + if (dataOutputChannel.SendAsync(msg) > 0) { fTimeframeRTT[timeFrameId].start = boost::posix_time::microsec_clock::local_time(); if (++timeFrameId == UINT16_MAX - 1) { @@ -65,8 +63,6 @@ void FLPSyncSampler::Run() while (fEventCounter == 0) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } - - delete msg; } try { @@ -89,7 +85,7 @@ void FLPSyncSampler::ListenForAcks() while (CheckCurrentState(RUNNING)) { try { - FairMQMessage* idMsg = fTransportFactory->CreateMessage(); + unique_ptr idMsg(fTransportFactory->CreateMessage()); if (fChannels.at("ack-in").at(0).Receive(idMsg) > 0) { id = *(static_cast(idMsg->GetData())); diff --git a/devices/flp2epn/O2EPNex.cxx b/devices/flp2epn/O2EPNex.cxx index 3e0520b25ee47..5dca87e2dabdd 100644 --- a/devices/flp2epn/O2EPNex.cxx +++ b/devices/flp2epn/O2EPNex.cxx @@ -7,8 +7,17 @@ #include #include -#include "O2EPNex.h" + #include "FairMQLogger.h" +#include "O2EPNex.h" + +struct Content { + double a; + double b; + int x; + int y; + int z; +}; O2EPNex::O2EPNex() { @@ -17,19 +26,16 @@ O2EPNex::O2EPNex() void O2EPNex::Run() { while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); - fChannels["data-in"].at(0).Receive(msg); + fChannels.at("data-in").at(0).Receive(msg); - int inputSize = msg->GetSize(); - int numInput = inputSize / sizeof(Content); - Content* input = reinterpret_cast(msg->GetData()); + int numInput = msg->GetSize() / sizeof(Content); + Content* input = static_cast(msg->GetData()); // for (int i = 0; i < numInput; ++i) { // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; // } - - delete msg; } } diff --git a/devices/flp2epn/O2EPNex.h b/devices/flp2epn/O2EPNex.h index 4afe029502b4b..291ccb9ca3fc4 100644 --- a/devices/flp2epn/O2EPNex.h +++ b/devices/flp2epn/O2EPNex.h @@ -10,19 +10,12 @@ #include "FairMQDevice.h" -struct Content { - double a; - double b; - int x; - int y; - int z; -}; - class O2EPNex : public FairMQDevice { public: O2EPNex(); virtual ~O2EPNex(); + protected: virtual void Run(); }; diff --git a/devices/flp2epn/O2EpnMerger.cxx b/devices/flp2epn/O2EpnMerger.cxx index 4960b1b3aa5fb..26ed10d05bfdb 100644 --- a/devices/flp2epn/O2EpnMerger.cxx +++ b/devices/flp2epn/O2EpnMerger.cxx @@ -17,53 +17,30 @@ O2EpnMerger::O2EpnMerger() void O2EpnMerger::Run() { - FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels["data-in"]); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); - bool received = false; - int NoOfMsgParts = fChannels["data-in"].size() - 1; + int numParts = fChannels.at("data-in").size() - 1; while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); poller->Poll(100); - for (int i = 0; i < fChannels["data-in"].size(); i++) { + for (int i = 0; i < fChannels.at("data-in").size(); i++) { if (poller->CheckInput(i)) { - if (fChannels["data-in"].at(i).Receive(msg)) { + if (fChannels.at("data-in").at(i).Receive(msg)) { // LOG(INFO) << "------ recieve Msg from " << i ; - if (i < NoOfMsgParts) { - fChannels["data-out"].at(0).Send(msg, "snd-more"); + if (i < numParts) { + fChannels.at("data-out").at(0).SendPart(msg); // LOG(INFO) << "------ Send Msg Part " << i ; } else { - fChannels["data-out"].at(0).Send(msg); + fChannels.at("data-out").at(0).Send(msg); // LOG(INFO) << "------ Send last Msg Part " << i ; } } } } - - delete msg; } - - delete poller; - -//-------------------- - - // while (CheckCurrentState(RUNNING)) { - // FairMQMessage* msg = fTransportFactory->CreateMessage(); - - // fChannels["data-in"].at(0).Receive(msg); - - // int inputSize = msg->GetSize(); - // int numInput = inputSize / sizeof(Content); - // Content* input = reinterpret_cast(msg->GetData()); - - // // for (int i = 0; i < numInput; ++i) { - // // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; - // // } - - // delete msg; - // } } O2EpnMerger::~O2EpnMerger() diff --git a/devices/flp2epn/O2EpnMerger.h b/devices/flp2epn/O2EpnMerger.h index 92a2b8f71006a..6cca142822b9d 100644 --- a/devices/flp2epn/O2EpnMerger.h +++ b/devices/flp2epn/O2EpnMerger.h @@ -10,19 +10,12 @@ #include "FairMQDevice.h" -struct Content { - double a; - double b; - int x; - int y; - int z; -}; - class O2EpnMerger: public FairMQDevice { public: O2EpnMerger(); virtual ~O2EpnMerger(); + protected: virtual void Run(); }; diff --git a/devices/flp2epn/O2FLPex.cxx b/devices/flp2epn/O2FLPex.cxx index 59a2239d27bcf..6eed815b2daaa 100644 --- a/devices/flp2epn/O2FLPex.cxx +++ b/devices/flp2epn/O2FLPex.cxx @@ -17,6 +17,15 @@ using namespace std; +struct Content { + int id; + double a; + double b; + int x; + int y; + int z; +}; + O2FLPex::O2FLPex() : fEventSize(10000) { @@ -26,74 +35,68 @@ O2FLPex::~O2FLPex() { } -void O2FLPex::Init() -{ - FairMQDevice::Init(); -} - void O2FLPex::Run() { srand(time(NULL)); + FairMQChannel& outChannel = fChannels.at("data-out").at(0); + LOG(DEBUG) << "Message size: " << fEventSize * sizeof(Content) << " bytes."; while (CheckCurrentState(RUNNING)) { - Content* payload = new Content[fEventSize]; + vector payload(fEventSize); for (int i = 0; i < fEventSize; ++i) { - (&payload[i])->x = rand() % 100 + 1; - (&payload[i])->y = rand() % 100 + 1; - (&payload[i])->z = rand() % 100 + 1; - (&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1); - (&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1); + payload.at(i).x = rand() % 100 + 1; + payload.at(i).y = rand() % 100 + 1; + payload.at(i).z = rand() % 100 + 1; + payload.at(i).a = (rand() % 100 + 1) / (rand() % 100 + 1); + payload.at(i).b = (rand() % 100 + 1) / (rand() % 100 + 1); // LOG(INFO) << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; } - FairMQMessage* msg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content)); - memcpy(msg->GetData(), payload, fEventSize * sizeof(Content)); - - fChannels["data-out"].at(0).Send(msg); + unique_ptr msg(fTransportFactory->CreateMessage(fEventSize * sizeof(Content))); + memcpy(msg->GetData(), payload.data(), fEventSize * sizeof(Content)); - delete[] payload; - delete msg; + outChannel.Send(msg); } } void O2FLPex::SetProperty(const int key, const string& value) { switch (key) { - default: - FairMQDevice::SetProperty(key, value); - break; + default: + FairMQDevice::SetProperty(key, value); + break; } } string O2FLPex::GetProperty(const int key, const string& default_/*= ""*/) { switch (key) { - default: - return FairMQDevice::GetProperty(key, default_); + default: + return FairMQDevice::GetProperty(key, default_); } } void O2FLPex::SetProperty(const int key, const int value) { switch (key) { - case EventSize: - fEventSize = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; + case EventSize: + fEventSize = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; } } int O2FLPex::GetProperty(const int key, const int default_/*= 0*/) { switch (key) { - case EventSize: - return fEventSize; - default: - return FairMQDevice::GetProperty(key, default_); + case EventSize: + return fEventSize; + default: + return FairMQDevice::GetProperty(key, default_); } } diff --git a/devices/flp2epn/O2FLPex.h b/devices/flp2epn/O2FLPex.h index e4613559807b6..bef7be48f2a90 100644 --- a/devices/flp2epn/O2FLPex.h +++ b/devices/flp2epn/O2FLPex.h @@ -12,15 +12,6 @@ #include "FairMQDevice.h" -struct Content { - int id; - double a; - double b; - int x; - int y; - int z; -}; - class O2FLPex : public FairMQDevice { public: @@ -41,7 +32,6 @@ class O2FLPex : public FairMQDevice protected: int fEventSize; - virtual void Init(); virtual void Run(); }; diff --git a/devices/flp2epn/O2Merger.cxx b/devices/flp2epn/O2Merger.cxx index b116dc0693438..e02d563d2e4ca 100644 --- a/devices/flp2epn/O2Merger.cxx +++ b/devices/flp2epn/O2Merger.cxx @@ -9,8 +9,9 @@ #include #include "FairMQLogger.h" -#include "O2Merger.h" #include "FairMQPoller.h" +#include "O2Merger.h" + O2Merger::O2Merger() { } @@ -21,33 +22,29 @@ O2Merger::~O2Merger() void O2Merger::Run() { - FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels["data-in"]); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("data-in"))); - int NoOfMsgParts = fChannels["data-in"].size() - 1; + int numParts = fChannels.at("data-in").size() - 1; while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + std::unique_ptr msg(fTransportFactory->CreateMessage()); poller->Poll(100); - for (int i = 0; i < fChannels["data-in"].size(); i++) { + for (int i = 0; i < fChannels.at("data-in").size(); i++) { if (poller->CheckInput(i)) { - if (fChannels["data-in"].at(i).Receive(msg)) { + if (fChannels.at("data-in").at(i).Receive(msg) >= 0) { // LOG(INFO) << "------ recieve Msg from " << i ; - if (i < NoOfMsgParts) { - fChannels["data-out"].at(0).Send(msg, "snd-more"); + if (i < numParts) { + fChannels.at("data-out").at(0).SendPart(msg); // LOG(INFO) << "------ Send Msg Part " << i ; } else { - fChannels["data-out"].at(0).Send(msg); + fChannels.at("data-out").at(0).Send(msg); // LOG(INFO) << "------ Send last Msg Part " << i ; } } } } - - delete msg; } - - delete poller; } diff --git a/devices/flp2epn/O2Merger.h b/devices/flp2epn/O2Merger.h index 6925e14b4b17d..02d9a0e9a834d 100644 --- a/devices/flp2epn/O2Merger.h +++ b/devices/flp2epn/O2Merger.h @@ -16,6 +16,7 @@ class O2Merger: public FairMQDevice public: O2Merger(); virtual ~O2Merger(); + protected: virtual void Run(); }; diff --git a/devices/flp2epn/O2Proxy.cxx b/devices/flp2epn/O2Proxy.cxx index 4db1a009e389f..cb75d8f267ecd 100644 --- a/devices/flp2epn/O2Proxy.cxx +++ b/devices/flp2epn/O2Proxy.cxx @@ -11,8 +11,6 @@ #include "FairMQLogger.h" #include "O2Proxy.h" - - O2Proxy::O2Proxy() { } @@ -23,24 +21,26 @@ O2Proxy::~O2Proxy() void O2Proxy::Run() { + FairMQChannel& inChannel = fChannels.at("data-in").at(0); + FairMQChannel& outChannel = fChannels.at("data-out").at(0); + while (CheckCurrentState(RUNNING)) { // int i = 0; - int64_t more = 0; - size_t more_size = sizeof more; + bool more = false; + do { - /* Create an empty ØMQ message to hold the message part */ - FairMQMessage* msgpart = fTransportFactory->CreateMessage(); + /* Create an empty message to hold the message part */ + std::unique_ptr part(fTransportFactory->CreateMessage()); /* Block until a message is available to be received from socket */ - fChannels["data-in"].at(0).Receive(msgpart); + inChannel.Receive(part); /* Determine if more message parts are to follow */ - fChannels["data-in"].at(0).fSocket->GetOption("rcv-more", &more, &more_size); + more = inChannel.ExpectsAnotherPart(); // LOG(INFO) << "------ Get Msg Part "<< " more = " << more << " counter " << i++ ; if (more) { - fChannels["data-out"].at(0).Send(msgpart, "snd-more"); + outChannel.SendPart(part); } else { - fChannels["data-out"].at(0).Send(msgpart); + outChannel.Send(part); } - delete msgpart; } while (more); // i = 0; } diff --git a/devices/flp2epn/O2Proxy.h b/devices/flp2epn/O2Proxy.h index 8d496b3e0890c..c3b7faef70d61 100644 --- a/devices/flp2epn/O2Proxy.h +++ b/devices/flp2epn/O2Proxy.h @@ -16,6 +16,7 @@ class O2Proxy: public FairMQDevice public: O2Proxy(); virtual ~O2Proxy(); + protected: virtual void Run(); }; From fd541290081e7f2121957dc3c4c9a837477373b9 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 17 Nov 2015 13:46:37 +0100 Subject: [PATCH 2/2] Add integration test for flp2epn-distributed Test starts a chain of flpSyncSampler, 2 flpSenders and 2 epnReceivers. 100 time frames are generated. Test succeeds if the timeframes are correctly distributed, merged and acknowledged. --- devices/flp2epn-distributed/CMakeLists.txt | 11 +- devices/flp2epn-distributed/FLPSender.cxx | 3 +- .../flp2epn-distributed/FLPSyncSampler.cxx | 70 +++++++++--- devices/flp2epn-distributed/FLPSyncSampler.h | 4 + .../run/runEPNReceiver.cxx | 17 ++- .../flp2epn-distributed/run/runFLPSender.cxx | 75 ++++++++----- .../run/runFLPSyncSampler.cxx | 41 +++++-- .../runO2Prototype/runFLPSyncSampler.cxx | 22 ++-- .../test/testFLP2EPN-distributed.sh.in | 103 ++++++++++++++++++ 9 files changed, 277 insertions(+), 69 deletions(-) create mode 100755 devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in diff --git a/devices/flp2epn-distributed/CMakeLists.txt b/devices/flp2epn-distributed/CMakeLists.txt index 06bd1cca49659..e3a6b251d5e31 100644 --- a/devices/flp2epn-distributed/CMakeLists.txt +++ b/devices/flp2epn-distributed/CMakeLists.txt @@ -1,3 +1,6 @@ +configure_file(${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh) +configure_file(${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh) + set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed ) @@ -22,10 +25,6 @@ endif() include_directories(${INCLUDE_DIRECTORIES}) include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) -configure_file( - ${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh -) - set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ${FAIRROOT_LIBRARY_DIR} @@ -115,3 +114,7 @@ ForEach(_file RANGE 0 ${_length}) set(DEPENDENCIES FLP2EPNex_distributed) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) + +add_test(NAME run_flp2epn_distributed COMMAND ${CMAKE_BINARY_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh) +set_tests_properties(run_flp2epn_distributed PROPERTIES TIMEOUT "30") +set_tests_properties(run_flp2epn_distributed PROPERTIES PASS_REGULAR_EXPRESSION "acknowledged after") diff --git a/devices/flp2epn-distributed/FLPSender.cxx b/devices/flp2epn-distributed/FLPSender.cxx index 8b0f4d93c17ce..2bd65489c6c6c 100644 --- a/devices/flp2epn-distributed/FLPSender.cxx +++ b/devices/flp2epn-distributed/FLPSender.cxx @@ -197,7 +197,8 @@ inline void FLPSender::sendFrontData() // fArrivalTime.pop(); // fDataBuffer.pop(); // } else { // if the heartbeat from the corresponding EPN is within timeout period, send the data. - if (fChannels.at("data-out").at(direction).SendPartAsync(fHeaderBuffer.front()) < 0) { + if (fChannels.at("data-out").at(direction).SendPart(fHeaderBuffer.front()) < 0) { + // TODO: replace SendPart() with SendPartAsync() after nov15 fairroot release LOG(ERROR) << "Failed to queue ID part of event #" << currentTimeframeId; } else { if (fChannels.at("data-out").at(direction).SendAsync(fDataBuffer.front()) < 0) { diff --git a/devices/flp2epn-distributed/FLPSyncSampler.cxx b/devices/flp2epn-distributed/FLPSyncSampler.cxx index c13dcb2fdb6f0..e8f91af7c571d 100644 --- a/devices/flp2epn-distributed/FLPSyncSampler.cxx +++ b/devices/flp2epn-distributed/FLPSyncSampler.cxx @@ -21,6 +21,8 @@ using namespace AliceO2::Devices; FLPSyncSampler::FLPSyncSampler() : fEventRate(1) + , fMaxEvents(0) + , fStoreRTTinFile(0) , fEventCounter(0) , fTimeframeRTT() { @@ -42,7 +44,7 @@ void FLPSyncSampler::Run() boost::thread resetEventCounter(boost::bind(&FLPSyncSampler::ResetEventCounter, this)); boost::thread ackListener(boost::bind(&FLPSyncSampler::ListenForAcks, this)); - uint16_t timeFrameId = 1; + uint16_t timeFrameId = 0; FairMQChannel& dataOutputChannel = fChannels.at("data-out").at(0); @@ -50,11 +52,11 @@ void FLPSyncSampler::Run() unique_ptr msg(fTransportFactory->CreateMessage(sizeof(uint16_t))); memcpy(msg->GetData(), &timeFrameId, sizeof(uint16_t)); - if (dataOutputChannel.SendAsync(msg) > 0) { + if (dataOutputChannel.Send(msg) >= 0) { fTimeframeRTT[timeFrameId].start = boost::posix_time::microsec_clock::local_time(); if (++timeFrameId == UINT16_MAX - 1) { - timeFrameId = 1; + timeFrameId = 0; } } @@ -63,6 +65,11 @@ void FLPSyncSampler::Run() while (fEventCounter == 0) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); } + + if (fMaxEvents > 0 && timeFrameId >= fMaxEvents) { + LOG(INFO) << "Reached configured maximum number of events (" << fMaxEvents << "). Exiting Run()."; + break; + } } try { @@ -79,32 +86,51 @@ void FLPSyncSampler::ListenForAcks() { uint16_t id = 0; - string name = to_iso_string(boost::posix_time::microsec_clock::local_time()).substr(0, 20); - ofstream ofsFrames(name + "-frames.log"); - ofstream ofsTimes(name + "-times.log"); + unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at("ack-in"))); + + ofstream ofsFrames; + ofstream ofsTimes; + + // store round trip time measurements in a file + if (fStoreRTTinFile > 0) { + string name = to_iso_string(boost::posix_time::microsec_clock::local_time()).substr(0, 20); + ofsFrames.open(name + "-frames.log"); + ofsTimes.open(name + "-times.log"); + } while (CheckCurrentState(RUNNING)) { try { - unique_ptr idMsg(fTransportFactory->CreateMessage()); + poller->Poll(100); - if (fChannels.at("ack-in").at(0).Receive(idMsg) > 0) { - id = *(static_cast(idMsg->GetData())); - fTimeframeRTT.at(id).end = boost::posix_time::microsec_clock::local_time(); - // store values in a file - ofsFrames << id << "\n"; - ofsTimes << (fTimeframeRTT.at(id).end - fTimeframeRTT.at(id).start).total_microseconds() << "\n"; + unique_ptr idMsg(fTransportFactory->CreateMessage()); - LOG(INFO) << "Timeframe #" << id << " acknowledged after " - << (fTimeframeRTT.at(id).end - fTimeframeRTT.at(id).start).total_microseconds() << " μs."; + if (poller->CheckInput(0)) { + if (fChannels.at("ack-in").at(0).Receive(idMsg) >= 0) { + id = *(static_cast(idMsg->GetData())); + fTimeframeRTT.at(id).end = boost::posix_time::microsec_clock::local_time(); + // store values in a file + if (fStoreRTTinFile > 0) { + ofsFrames << id << "\n"; + ofsTimes << (fTimeframeRTT.at(id).end - fTimeframeRTT.at(id).start).total_microseconds() << "\n"; + } + + LOG(INFO) << "Timeframe #" << id << " acknowledged after " + << (fTimeframeRTT.at(id).end - fTimeframeRTT.at(id).start).total_microseconds() << " μs."; + } } + + boost::this_thread::interruption_point(); } catch (boost::thread_interrupted&) { LOG(DEBUG) << "Acknowledgement listener thread interrupted"; break; } } - ofsFrames.close(); - ofsTimes.close(); + // store round trip time measurements in a file + if (fStoreRTTinFile > 0) { + ofsFrames.close(); + ofsTimes.close(); + } } void FLPSyncSampler::ResetEventCounter() @@ -143,6 +169,12 @@ void FLPSyncSampler::SetProperty(const int key, const int value) case EventRate: fEventRate = value; break; + case MaxEvents: + fMaxEvents = value; + break; + case StoreRTTinFile: + fStoreRTTinFile = value; + break; default: FairMQDevice::SetProperty(key, value); break; @@ -154,6 +186,10 @@ int FLPSyncSampler::GetProperty(const int key, const int default_ /*= 0*/) switch (key) { case EventRate: return fEventRate; + case MaxEvents: + return fMaxEvents; + case StoreRTTinFile: + return fStoreRTTinFile; default: return FairMQDevice::GetProperty(key, default_); } diff --git a/devices/flp2epn-distributed/FLPSyncSampler.h b/devices/flp2epn-distributed/FLPSyncSampler.h index 77f634f3e0894..a6b7a93272ad6 100644 --- a/devices/flp2epn-distributed/FLPSyncSampler.h +++ b/devices/flp2epn-distributed/FLPSyncSampler.h @@ -33,6 +33,8 @@ class FLPSyncSampler : public FairMQDevice public: enum { EventRate = FairMQDevice::Last, ///< Publishing rate of the timeframe IDs + MaxEvents, ///< Maximum number of events to send (0 - unlimited) + StoreRTTinFile, ///< Store round trip time measurements in a file Last }; @@ -74,6 +76,8 @@ class FLPSyncSampler : public FairMQDevice std::array fTimeframeRTT; ///< Container for the roundtrip values per timeframe ID int fEventRate; ///< Publishing rate of the timeframe IDs + int fMaxEvents; ///< Maximum number of events to send (0 - unlimited) + int fStoreRTTinFile; ///< Store round trip time measurements in a file. int fEventCounter; ///< Controls the send rate of the timeframe IDs }; diff --git a/devices/flp2epn-distributed/run/runEPNReceiver.cxx b/devices/flp2epn-distributed/run/runEPNReceiver.cxx index b6484604060f9..575bb220ac25e 100644 --- a/devices/flp2epn-distributed/run/runEPNReceiver.cxx +++ b/devices/flp2epn-distributed/run/runEPNReceiver.cxx @@ -25,6 +25,7 @@ typedef struct DeviceOptions int bufferTimeoutInMs; int numFLPs; int testMode; + int interactive; string dataInSocketType; int dataInBufSize; @@ -65,6 +66,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) ("buffer-timeout", bpo::value()->default_value(1000), "Buffer timeout in milliseconds") ("num-flps", bpo::value()->required(), "Number of FLPs") ("test-mode", bpo::value()->default_value(0), "Run in test mode") + ("interactive", bpo::value()->default_value(1), "Run in interactive mode (1/0)") ("data-in-socket-type", bpo::value()->default_value("pull"), "Data input socket type: sub/pull") ("data-in-buff-size", bpo::value()->default_value(10), "Data input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") @@ -108,6 +110,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) if (vm.count("buffer-timeout")) { _options->bufferTimeoutInMs = vm["buffer-timeout"].as(); } if (vm.count("num-flps")) { _options->numFLPs = vm["num-flps"].as(); } if (vm.count("test-mode")) { _options->testMode = vm["test-mode"].as(); } + if (vm.count("interactive")) { _options->interactive = vm["interactive"].as(); } if (vm.count("data-in-socket-type")) { _options->dataInSocketType = vm["data-in-socket-type"].as(); } if (vm.count("data-in-buff-size")) { _options->dataInBufSize = vm["data-in-buff-size"].as(); } @@ -214,7 +217,19 @@ int main(int argc, char** argv) // run the device epn.ChangeState("RUN"); - epn.InteractiveStateLoop(); + if (options.interactive > 0) { + epn.InteractiveStateLoop(); + } else { + epn.WaitForEndOfState("RUN"); + + epn.ChangeState("RESET_TASK"); + epn.WaitForEndOfState("RESET_TASK"); + + epn.ChangeState("RESET_DEVICE"); + epn.WaitForEndOfState("RESET_DEVICE"); + + epn.ChangeState("END"); + } return 0; } diff --git a/devices/flp2epn-distributed/run/runFLPSender.cxx b/devices/flp2epn-distributed/run/runFLPSender.cxx index 889fa3ef26523..2fa049b2b8bd2 100644 --- a/devices/flp2epn-distributed/run/runFLPSender.cxx +++ b/devices/flp2epn-distributed/run/runFLPSender.cxx @@ -26,6 +26,7 @@ typedef struct DeviceOptions int numEPNs; int heartbeatTimeoutInMs; int testMode; + int interactive; int sendOffset; int sendDelay; @@ -64,6 +65,7 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) ("num-epns", bpo::value()->required(), "Number of EPNs") ("heartbeat-timeout", bpo::value()->default_value(20000), "Heartbeat timeout in milliseconds") ("test-mode", bpo::value()->default_value(0), "Run in test mode") + ("interactive", bpo::value()->default_value(1), "Run in interactive mode (1/0)") ("send-offset", bpo::value()->default_value(0), "Offset for staggered sending") ("send-delay", bpo::value()->default_value(8), "Delay for staggered sending") @@ -97,35 +99,36 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("flp-index")) { _options->flpIndex = vm["flp-index"].as(); } - if (vm.count("event-size")) { _options->eventSize = vm["event-size"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } - - if (vm.count("num-epns")) { _options->numEPNs = vm["num-epns"].as(); } - - if (vm.count("heartbeat-timeout")) { _options->heartbeatTimeoutInMs = vm["heartbeat-timeout"].as(); } - if (vm.count("test-mode")) { _options->testMode = vm["test-mode"].as(); } - if (vm.count("send-offset")) { _options->sendOffset = vm["send-offset"].as(); } - if (vm.count("send-delay")) { _options->sendDelay = vm["send-delay"].as(); } - - if (vm.count("data-in-socket-type")) { _options->dataInSocketType = vm["data-in-socket-type"].as(); } - if (vm.count("data-in-buff-size")) { _options->dataInBufSize = vm["data-in-buff-size"].as(); } - if (vm.count("data-in-method")) { _options->dataInMethod = vm["data-in-method"].as(); } - if (vm.count("data-in-address")) { _options->dataInAddress = vm["data-in-address"].as(); } - if (vm.count("data-in-rate-logging")) { _options->dataInRateLogging = vm["data-in-rate-logging"].as(); } - - if (vm.count("data-out-socket-type")) { _options->dataOutSocketType = vm["data-out-socket-type"].as(); } - if (vm.count("data-out-buff-size")) { _options->dataOutBufSize = vm["data-out-buff-size"].as(); } - if (vm.count("data-out-method")) { _options->dataOutMethod = vm["data-out-method"].as(); } - if (vm.count("data-out-address")) { _options->dataOutAddress = vm["data-out-address"].as>(); } - if (vm.count("data-out-rate-logging")) { _options->dataOutRateLogging = vm["data-out-rate-logging"].as(); } - - if (vm.count("hb-in-socket-type")) { _options->hbInSocketType = vm["hb-in-socket-type"].as(); } - if (vm.count("hb-in-buff-size")) { _options->hbInBufSize = vm["hb-in-buff-size"].as(); } - if (vm.count("hb-in-method")) { _options->hbInMethod = vm["hb-in-method"].as(); } - if (vm.count("hb-in-address")) { _options->hbInAddress = vm["hb-in-address"].as(); } - if (vm.count("hb-in-rate-logging")) { _options->hbInRateLogging = vm["hb-in-rate-logging"].as(); } + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("flp-index")) { _options->flpIndex = vm["flp-index"].as(); } + if (vm.count("event-size")) { _options->eventSize = vm["event-size"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + + if (vm.count("num-epns")) { _options->numEPNs = vm["num-epns"].as(); } + + if (vm.count("heartbeat-timeout")) { _options->heartbeatTimeoutInMs = vm["heartbeat-timeout"].as(); } + if (vm.count("test-mode")) { _options->testMode = vm["test-mode"].as(); } + if (vm.count("interactive")) { _options->interactive = vm["interactive"].as(); } + if (vm.count("send-offset")) { _options->sendOffset = vm["send-offset"].as(); } + if (vm.count("send-delay")) { _options->sendDelay = vm["send-delay"].as(); } + + if (vm.count("data-in-socket-type")) { _options->dataInSocketType = vm["data-in-socket-type"].as(); } + if (vm.count("data-in-buff-size")) { _options->dataInBufSize = vm["data-in-buff-size"].as(); } + if (vm.count("data-in-method")) { _options->dataInMethod = vm["data-in-method"].as(); } + if (vm.count("data-in-address")) { _options->dataInAddress = vm["data-in-address"].as(); } + if (vm.count("data-in-rate-logging")) { _options->dataInRateLogging = vm["data-in-rate-logging"].as(); } + + if (vm.count("data-out-socket-type")) { _options->dataOutSocketType = vm["data-out-socket-type"].as(); } + if (vm.count("data-out-buff-size")) { _options->dataOutBufSize = vm["data-out-buff-size"].as(); } + if (vm.count("data-out-method")) { _options->dataOutMethod = vm["data-out-method"].as(); } + if (vm.count("data-out-address")) { _options->dataOutAddress = vm["data-out-address"].as>(); } + if (vm.count("data-out-rate-logging")) { _options->dataOutRateLogging = vm["data-out-rate-logging"].as(); } + + if (vm.count("hb-in-socket-type")) { _options->hbInSocketType = vm["hb-in-socket-type"].as(); } + if (vm.count("hb-in-buff-size")) { _options->hbInBufSize = vm["hb-in-buff-size"].as(); } + if (vm.count("hb-in-method")) { _options->hbInMethod = vm["hb-in-method"].as(); } + if (vm.count("hb-in-address")) { _options->hbInAddress = vm["hb-in-address"].as(); } + if (vm.count("hb-in-rate-logging")) { _options->hbInRateLogging = vm["hb-in-rate-logging"].as(); } return true; } @@ -204,7 +207,19 @@ int main(int argc, char** argv) // run the device flp.ChangeState("RUN"); - flp.InteractiveStateLoop(); + if (options.interactive > 0) { + flp.InteractiveStateLoop(); + } else { + flp.WaitForEndOfState("RUN"); + + flp.ChangeState("RESET_TASK"); + flp.WaitForEndOfState("RESET_TASK"); + + flp.ChangeState("RESET_DEVICE"); + flp.WaitForEndOfState("RESET_DEVICE"); + + flp.ChangeState("END"); + } return 0; } diff --git a/devices/flp2epn-distributed/run/runFLPSyncSampler.cxx b/devices/flp2epn-distributed/run/runFLPSyncSampler.cxx index a23aabc82df62..e9aeb7729032a 100644 --- a/devices/flp2epn-distributed/run/runFLPSyncSampler.cxx +++ b/devices/flp2epn-distributed/run/runFLPSyncSampler.cxx @@ -21,7 +21,10 @@ typedef struct DeviceOptions { string id; int eventRate; + int maxEvents; int ioThreads; + int interactive; + int storeRTTinFile; string dataOutSocketType; int dataOutBufSize; @@ -46,7 +49,10 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value()->required(), "Device ID") ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("max-events", bpo::value()->default_value(0), "Maximum number of events to send (0 - unlimited)") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("interactive", bpo::value()->default_value(1), "Run in interactive mode (1/0)") + ("store-rtt-in-file", bpo::value()->default_value(0), "Store round trip time measurements in a file (1/0)") ("data-out-socket-type", bpo::value()->default_value("pub"), "Data output socket type: pub/push") ("data-out-buff-size", bpo::value()->default_value(100), "Data output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") @@ -72,9 +78,12 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("event-rate")) { _options->eventRate = vm["event-rate"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("event-rate")) { _options->eventRate = vm["event-rate"].as(); } + if (vm.count("max-events")) { _options->maxEvents = vm["max-events"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("interactive")) { _options->interactive = vm["interactive"].as(); } + if (vm.count("store-rtt-in-file")) { _options->storeRTTinFile = vm["store-rtt-in-file"].as(); } if (vm.count("data-out-socket-type")) { _options->dataOutSocketType = vm["data-out-socket-type"].as(); } if (vm.count("data-out-buff-size")) { _options->dataOutBufSize = vm["data-out-buff-size"].as(); } @@ -82,11 +91,11 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) if (vm.count("data-out-address")) { _options->dataOutAddress = vm["data-out-address"].as(); } if (vm.count("data-out-rate-logging")) { _options->dataOutRateLogging = vm["data-out-rate-logging"].as(); } - if (vm.count("ack-in-socket-type")) { _options->ackInSocketType = vm["ack-in-socket-type"].as(); } - if (vm.count("ack-in-buff-size")) { _options->ackInBufSize = vm["ack-in-buff-size"].as(); } - if (vm.count("ack-in-method")) { _options->ackInMethod = vm["ack-in-method"].as(); } - if (vm.count("ack-in-address")) { _options->ackInAddress = vm["ack-in-address"].as(); } - if (vm.count("ack-in-rate-logging")) { _options->ackInRateLogging = vm["ack-in-rate-logging"].as(); } + if (vm.count("ack-in-socket-type")) { _options->ackInSocketType = vm["ack-in-socket-type"].as(); } + if (vm.count("ack-in-buff-size")) { _options->ackInBufSize = vm["ack-in-buff-size"].as(); } + if (vm.count("ack-in-method")) { _options->ackInMethod = vm["ack-in-method"].as(); } + if (vm.count("ack-in-address")) { _options->ackInAddress = vm["ack-in-address"].as(); } + if (vm.count("ack-in-rate-logging")) { _options->ackInRateLogging = vm["ack-in-rate-logging"].as(); } return true; } @@ -119,6 +128,8 @@ int main(int argc, char** argv) sampler.SetProperty(FLPSyncSampler::Id, options.id); sampler.SetProperty(FLPSyncSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FLPSyncSampler::EventRate, options.eventRate); + sampler.SetProperty(FLPSyncSampler::MaxEvents, options.maxEvents); + sampler.SetProperty(FLPSyncSampler::StoreRTTinFile, options.storeRTTinFile); // configure data output channel FairMQChannel dataOutChannel(options.dataOutSocketType, options.dataOutMethod, options.dataOutAddress); @@ -144,7 +155,19 @@ int main(int argc, char** argv) // run the device sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); + if (options.interactive > 0) { + sampler.InteractiveStateLoop(); + } else { + sampler.WaitForEndOfState("RUN"); + + sampler.ChangeState("RESET_TASK"); + sampler.WaitForEndOfState("RESET_TASK"); + + sampler.ChangeState("RESET_DEVICE"); + sampler.WaitForEndOfState("RESET_DEVICE"); + + sampler.ChangeState("END"); + } return 0; } diff --git a/devices/flp2epn-distributed/runO2Prototype/runFLPSyncSampler.cxx b/devices/flp2epn-distributed/runO2Prototype/runFLPSyncSampler.cxx index dd90d82ca3b79..1736dfc340c3e 100644 --- a/devices/flp2epn-distributed/runO2Prototype/runFLPSyncSampler.cxx +++ b/devices/flp2epn-distributed/runO2Prototype/runFLPSyncSampler.cxx @@ -29,6 +29,8 @@ typedef struct DeviceOptions { string id; int eventRate; + int maxEvents; + int storeRTTinFile; int ioThreads; string dataOutSocketType; @@ -54,6 +56,8 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) desc.add_options() ("id", bpo::value()->required(), "Device ID") ("event-rate", bpo::value()->default_value(100), "Event rate limit in maximum number of events per second") + ("max-events", bpo::value()->default_value(0), "Maximum number of events to send (0 - unlimited)") + ("store-rtt-in-file", bpo::value()->default_value(0), "Store round trip time measurements in a file (1/0)") ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") ("data-out-socket-type", bpo::value()->default_value("pub"), "Data output socket type: pub/push") @@ -80,9 +84,11 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) bpo::notify(vm); - if (vm.count("id")) { _options->id = vm["id"].as(); } - if (vm.count("event-rate")) { _options->eventRate = vm["event-rate"].as(); } - if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } + if (vm.count("id")) { _options->id = vm["id"].as(); } + if (vm.count("event-rate")) { _options->eventRate = vm["event-rate"].as(); } + if (vm.count("max-events")) { _options->maxEvents = vm["max-events"].as(); } + if (vm.count("store-rtt-in-file")) { _options->storeRTTinFile = vm["store-rtt-in-file"].as(); } + if (vm.count("io-threads")) { _options->ioThreads = vm["io-threads"].as(); } if (vm.count("data-out-socket-type")) { _options->dataOutSocketType = vm["data-out-socket-type"].as(); } if (vm.count("data-out-buff-size")) { _options->dataOutBufSize = vm["data-out-buff-size"].as(); } @@ -90,11 +96,11 @@ inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) // if (vm.count("data-out-address")) { _options->dataOutAddress = vm["data-out-address"].as(); } if (vm.count("data-out-rate-logging")) { _options->dataOutRateLogging = vm["data-out-rate-logging"].as(); } - if (vm.count("ack-in-socket-type")) { _options->ackInSocketType = vm["ack-in-socket-type"].as(); } - if (vm.count("ack-in-buff-size")) { _options->ackInBufSize = vm["ack-in-buff-size"].as(); } - if (vm.count("ack-in-method")) { _options->ackInMethod = vm["ack-in-method"].as(); } + if (vm.count("ack-in-socket-type")) { _options->ackInSocketType = vm["ack-in-socket-type"].as(); } + if (vm.count("ack-in-buff-size")) { _options->ackInBufSize = vm["ack-in-buff-size"].as(); } + if (vm.count("ack-in-method")) { _options->ackInMethod = vm["ack-in-method"].as(); } // if (vm.count("ack-in-address")) { _options->ackInAddress = vm["ack-in-address"].as(); } - if (vm.count("ack-in-rate-logging")) { _options->ackInRateLogging = vm["ack-in-rate-logging"].as(); } + if (vm.count("ack-in-rate-logging")) { _options->ackInRateLogging = vm["ack-in-rate-logging"].as(); } return true; } @@ -148,6 +154,8 @@ int main(int argc, char** argv) sampler.SetProperty(FLPSyncSampler::Id, options.id); sampler.SetProperty(FLPSyncSampler::NumIoThreads, options.ioThreads); sampler.SetProperty(FLPSyncSampler::EventRate, options.eventRate); + sampler.SetProperty(FLPSyncSampler::MaxEvents, options.maxEvents); + sampler.SetProperty(FLPSyncSampler::StoreRTTinFile, options.storeRTTinFile); FairMQChannel dataOutChannel(options.dataOutSocketType, options.dataOutMethod, ownAddress); dataOutChannel.UpdateSndBufSize(options.dataOutBufSize); diff --git a/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in b/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in new file mode 100755 index 0000000000000..72a07e707c36b --- /dev/null +++ b/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in @@ -0,0 +1,103 @@ +#!/bin/bash + +# setup a trap to kill everything if the test fails/timeouts + +trap 'kill -TERM $FLP0_PID; kill -TERM $FLP1_PID; kill -TERM $EPN0_PID; kill -TERM $EPN1_PID; kill -TERM $SAMPLER_PID; wait $FLP0_PID; wait $FLP1_PID; wait $EPN0_PID; wait $EPN1_PID; wait $SAMPLER_PID;' TERM + +# start 2 flpSenders in non-interactive mode + +FLP0="flpSender" +FLP0+=" --id FLP1" +FLP0+=" --interactive 0" +FLP0+=" --flp-index 0" +FLP0+=" --event-size 1000000" +FLP0+=" --num-epns 2" +FLP0+=" --heartbeat-timeout 20000" +FLP0+=" --test-mode 1" +FLP0+=" --send-offset 0" +FLP0+=" --hb-in-address tcp://127.0.0.1:7580" +FLP0+=" --data-in-socket-type sub --data-in-method connect --data-in-address tcp://127.0.0.1:7550" # non-default socket type and method for test mode, default is pull/bind +FLP0+=" --data-out-address tcp://127.0.0.1:7560" +FLP0+=" --data-out-address tcp://127.0.0.1:7561" +@CMAKE_BINARY_DIR@/bin/$FLP0 & +FLP0_PID=$! + +FLP1="flpSender" +FLP1+=" --id FLP2" +FLP1+=" --interactive 0" +FLP1+=" --flp-index 1" +FLP1+=" --event-size 1000000" +FLP1+=" --num-epns 2" +FLP1+=" --heartbeat-timeout 20000" +FLP1+=" --test-mode 1" +FLP1+=" --send-offset 0" +FLP1+=" --hb-in-address tcp://127.0.0.1:7581" +FLP1+=" --data-in-socket-type sub --data-in-method connect --data-in-address tcp://127.0.0.1:7550" # non-default socket type and method for test mode, default is pull/bind +FLP1+=" --data-out-address tcp://127.0.0.1:7560" +FLP1+=" --data-out-address tcp://127.0.0.1:7561" +@CMAKE_BINARY_DIR@/bin/$FLP1 & +FLP1_PID=$! + +# start 2 epnReceivers in non-interactive mode + +EPN0="epnReceiver" +EPN0+=" --id EPN1" +EPN0+=" --interactive 0" +EPN0+=" --heartbeat-interval 5000" +EPN0+=" --num-flps 2" +EPN0+=" --test-mode 1" +EPN0+=" --data-in-address tcp://127.0.0.1:7560" +EPN0+=" --data-out-address tcp://*:7590 --data-out-socket-type pub" # non-default socket type for test mode (because no receiver - do pub), default is push. +EPN0+=" --hb-out-address tcp://127.0.0.1:7580" +EPN0+=" --hb-out-address tcp://127.0.0.1:7581" +EPN0+=" --ack-out-address tcp://127.0.0.1:7990" +@CMAKE_BINARY_DIR@/bin/$EPN0 & +EPN0_PID=$! + +EPN1="epnReceiver" +EPN1+=" --id EPN2" +EPN1+=" --interactive 0" +EPN1+=" --heartbeat-interval 5000" +EPN1+=" --num-flps 2" +EPN1+=" --test-mode 1" +EPN1+=" --data-in-address tcp://127.0.0.1:7561" +EPN1+=" --data-out-address tcp://*:7591 --data-out-socket-type pub" # non-default socket type for test mode (because no receiver - do pub), default is push. +EPN1+=" --hb-out-address tcp://127.0.0.1:7580" +EPN1+=" --hb-out-address tcp://127.0.0.1:7581" +EPN1+=" --ack-out-address tcp://127.0.0.1:7990" +@CMAKE_BINARY_DIR@/bin/$EPN1 & +EPN1_PID=$! + +# give them some time to initialize before starting flpSyncSampler + +sleep 2 + +# start flpSyncSampler in non-interactive mode + +SAMPLER="flpSyncSampler" +SAMPLER+=" --id 101" +SAMPLER+=" --event-rate 100" +SAMPLER+=" --max-events 100" +SAMPLER+=" --interactive 0" +SAMPLER+=" --ack-in-address tcp://*:7990" +SAMPLER+=" --data-out-address tcp://*:7550" +@CMAKE_BINARY_DIR@/bin/$SAMPLER & +SAMPLER_PID=$! + +# wait for the flpSyncSampler process to finish + +wait $SAMPLER_PID + +# stop the flpSenders and epnReceivers + +kill -SIGINT $FLP0_PID +kill -SIGINT $FLP1_PID +kill -SIGINT $EPN0_PID +kill -SIGINT $EPN1_PID + +# wait for everything to finish + +wait $FLP0_PID +wait $FLP1_PID +wait $EPN0_PID +wait $EPN1_PID