diff --git a/.gitignore b/.gitignore index b8bd0267bdf1a..2e59955291068 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ *.exe *.out *.app + +# build directory +build/ \ No newline at end of file diff --git a/devices/CMakeLists.txt b/devices/CMakeLists.txt index c57c4594428d9..a9d0c3fcf142f 100644 --- a/devices/CMakeLists.txt +++ b/devices/CMakeLists.txt @@ -1,3 +1,4 @@ add_subdirectory (flp2epn) add_subdirectory (flp2epn-dynamic) +add_subdirectory (flp2epn-distributed) add_subdirectory (alicehlt) diff --git a/devices/flp2epn-distributed/CMakeLists.txt b/devices/flp2epn-distributed/CMakeLists.txt new file mode 100644 index 0000000000000..e650110282c8c --- /dev/null +++ b/devices/flp2epn-distributed/CMakeLists.txt @@ -0,0 +1,59 @@ +set(INCLUDE_DIRECTORIES + ${BASE_INCLUDE_DIRECTORIES} + ${Boost_INCLUDE_DIR} + ${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed +) + +include_directories(${INCLUDE_DIRECTORIES}) + +configure_file( + ${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh +) + +set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} + ${FAIRROOT_LIBRARY_DIR} +) + +link_directories(${LINK_DIRECTORIES}) + +set(SRCS + FLPex.cxx + EPNex.cxx + FLPexSampler.cxx +) + +set(DEPENDENCIES + ${DEPENDENCIES} + ${CMAKE_THREAD_LIBS_INIT} + boost_date_time boost_thread boost_timer boost_system boost_program_options FairMQ +) + +set(LIBRARY_NAME FLP2EPNex_distributed) + +GENERATE_LIBRARY() + +Set(Exe_Names + ${Exe_Names} + testFLP_distributed + testEPN_distributed + testFLPSampler +) + +set(Exe_Source + run/runFLP_distributed.cxx + run/runEPN_distributed.cxx + run/runFLPSampler.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FLP2EPNex_distributed) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/devices/flp2epn-distributed/EPNex.cxx b/devices/flp2epn-distributed/EPNex.cxx new file mode 100644 index 0000000000000..f806bbce35202 --- /dev/null +++ b/devices/flp2epn-distributed/EPNex.cxx @@ -0,0 +1,130 @@ +/** + * EPNex.cxx + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos + */ + +#include +#include +#include + +#include "EPNex.h" +#include "FairMQLogger.h" + +using namespace std; + +using namespace AliceO2::Devices; + +EPNex::EPNex() : + fHeartbeatIntervalInMs(5000) +{ +} + +EPNex::~EPNex() +{ +} + +void EPNex::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + // boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread heartbeatSender(boost::bind(&EPNex::sendHeartbeats, this)); + + size_t idPartSize = 0; + size_t dataPartSize = 0; + + while (fState == RUNNING) { + // Receive payload + FairMQMessage* idPart = fTransportFactory->CreateMessage(); + + idPartSize = fPayloadInputs->at(0)->Receive(idPart); + + if (idPartSize > 0) { + unsigned long* id = reinterpret_cast(idPart->GetData()); + LOG(INFO) << "Received Event #" << *id; + + FairMQMessage* dataPart = fTransportFactory->CreateMessage(); + dataPartSize = fPayloadInputs->at(0)->Receive(dataPart); + + if (dataPartSize > 0) { + // ... do something with data here ... + } + delete dataPart; + } + delete idPart; + } + + // rateLogger.interrupt(); + // rateLogger.join(); + + heartbeatSender.interrupt(); + heartbeatSender.join(); + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +void EPNex::sendHeartbeats() +{ + while (true) { + try { + for (int i = 0; i < fNumOutputs; ++i) { + FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(fInputAddress.at(0).size()); + memcpy(heartbeatMsg->GetData(), fInputAddress.at(0).c_str(), fInputAddress.at(0).size()); + + fPayloadOutputs->at(i)->Send(heartbeatMsg); + + delete heartbeatMsg; + } + boost::this_thread::sleep(boost::posix_time::milliseconds(fHeartbeatIntervalInMs)); + } catch (boost::thread_interrupted&) { + LOG(INFO) << "EPNex::sendHeartbeat() interrupted"; + break; + } + } // while (true) +} + +void EPNex::SetProperty(const int key, const string& value, const int slot/*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string EPNex::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void EPNex::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatIntervalInMs: + fHeartbeatIntervalInMs = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int EPNex::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatIntervalInMs: + return fHeartbeatIntervalInMs; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} diff --git a/devices/flp2epn-distributed/EPNex.h b/devices/flp2epn-distributed/EPNex.h new file mode 100644 index 0000000000000..5d9c41d1d4596 --- /dev/null +++ b/devices/flp2epn-distributed/EPNex.h @@ -0,0 +1,43 @@ +/** + * EPNex.h + * + * @since 2013-01-09 + * @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos + */ + +#ifndef ALICEO2_DEVICES_EPNEX_H_ +#define ALICEO2_DEVICES_EPNEX_H_ + +#include + +#include "FairMQDevice.h" + +namespace AliceO2 { +namespace Devices { + +class EPNex : public FairMQDevice +{ + public: + enum { + HeartbeatIntervalInMs = FairMQDevice::Last, + Last + }; + EPNex(); + virtual ~EPNex(); + + virtual void SetProperty(const int key, const std::string& value, const int slot = 0); + virtual std::string GetProperty(const int key, const std::string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + protected: + virtual void Run(); + void sendHeartbeats(); + + int fHeartbeatIntervalInMs; +}; + +} // namespace Devices +} // namespace AliceO2 + +#endif diff --git a/devices/flp2epn-distributed/FLPex.cxx b/devices/flp2epn-distributed/FLPex.cxx new file mode 100644 index 0000000000000..7edbee406fe98 --- /dev/null +++ b/devices/flp2epn-distributed/FLPex.cxx @@ -0,0 +1,239 @@ +/** + * FLPex.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos + */ + +#include + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQPoller.h" + +#include "FLPex.h" + +using namespace std; +using boost::posix_time::ptime; + +using namespace AliceO2::Devices; + +FLPex::FLPex() + : fHeartbeatTimeoutInMs(20000) + , fSendOffset(0) +{ +} + +FLPex::~FLPex() +{ +} + +void FLPex::Init() +{ + FairMQDevice::Init(); + + ptime nullTime; + + for (int i = 0; i < fNumOutputs; ++i) { + fOutputHeartbeat.push_back(nullTime); + } +} + +bool FLPex::updateIPHeartbeat(string reply) +{ + for (int i = 0; i < fNumOutputs; ++i) { + if (GetProperty(OutputAddress, "", i) == reply) { + ptime currentHeartbeat = boost::posix_time::microsec_clock::local_time(); + ptime storedHeartbeat = GetProperty(OutputHeartbeat, storedHeartbeat, i); + + if (to_simple_string(storedHeartbeat) != "not-a-date-time") { + LOG(INFO) << "EPN " << i << " (" << reply << ")" << " last seen " + << (currentHeartbeat - storedHeartbeat).total_milliseconds() << " ms ago."; + } + else { + LOG(INFO) << "IP has no heartbeat associated. Adding heartbeat: " << currentHeartbeat; + } + + SetProperty(OutputHeartbeat, currentHeartbeat, i); + + return true; + } + } + LOG(ERROR) << "IP " << reply << " unknown, not provided at execution time"; + + return false; +} + +void FLPex::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + // boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs); + + unsigned long eventId = 0; + int direction = 0; + int counter = 0; + int sent = 0; + ptime currentHeartbeat; + ptime storedHeartbeat; + + while (fState == RUNNING) { + poller->Poll(100); + + // input 0 - commands + if (poller->CheckInput(0)) { + FairMQMessage* commandMsg = fTransportFactory->CreateMessage(); + + if (fPayloadInputs->at(0)->Receive(commandMsg) > 0) { + //... handle command ... + } + + delete commandMsg; + } + + // input 1 - heartbeats + if (poller->CheckInput(1)) { + FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(); + + if (fPayloadInputs->at(1)->Receive(heartbeatMsg) > 0) { + string reply = string(static_cast(heartbeatMsg->GetData()), heartbeatMsg->GetSize()); + updateIPHeartbeat(reply); + } + + delete heartbeatMsg; + } + + // input 2 - data from Sampler + if (poller->CheckInput(2)) { + FairMQMessage* idPart = fTransportFactory->CreateMessage(); + if (fPayloadInputs->at(2)->Receive(idPart) > 0) { + FairMQMessage* dataPart = fTransportFactory->CreateMessage(); + if (fPayloadInputs->at(2)->Receive(dataPart) > 0) { + unsigned long* id = reinterpret_cast(idPart->GetData()); + eventId = *id; + LOG(INFO) << "Received Event #" << eventId; + + fIdBuffer.push(idPart); + fDataBuffer.push(dataPart); + } else { + LOG(ERROR) << "Could not receive data part."; + delete dataPart; + continue; + } + } else { + LOG(ERROR) << "Could not receive id part."; + delete idPart; + continue; + } + + if (counter == fSendOffset) { + eventId = *(reinterpret_cast(fIdBuffer.front()->GetData())); + direction = eventId % fNumOutputs; + + LOG(INFO) << "Trying to send event " << eventId << " to EPN#" << direction << "..."; + + currentHeartbeat = boost::posix_time::microsec_clock::local_time(); + storedHeartbeat = GetProperty(OutputHeartbeat, storedHeartbeat, direction); + + // if the heartbeat from the corresponding EPN is within timeout period, send the data. + if (to_simple_string(storedHeartbeat) != "not-a-date-time" || + (currentHeartbeat - storedHeartbeat).total_milliseconds() < fHeartbeatTimeoutInMs) { + fPayloadOutputs->at(direction)->Send(fIdBuffer.front(), "snd-more"); + sent = fPayloadOutputs->at(direction)->Send(fDataBuffer.front(), "no-block"); + if (sent == 0) { + LOG(ERROR) << "Could not send message with event #" << eventId << " without blocking"; + } + fIdBuffer.pop(); + fDataBuffer.pop(); + } else { // if the heartbeat is too old, receive the data and discard it. + LOG(WARN) << "Heartbeat too old for, discarding message."; + fIdBuffer.pop(); + fDataBuffer.pop(); + } + } else if (counter < fSendOffset) { + LOG(INFO) << "Buffering event..."; + ++counter; + } else { + LOG(ERROR) << "Counter larger than offset, something went wrong..."; + } + } // if (poller->CheckInput(2)) + } // while (fState == RUNNING) + + // rateLogger.interrupt(); + // rateLogger.join(); + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +void FLPex::SetProperty(const int key, const string& value, const int slot/*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string FLPex::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FLPex::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatTimeoutInMs: + fHeartbeatTimeoutInMs = value; + break; + case SendOffset: + fSendOffset = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int FLPex::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatTimeoutInMs: + return fHeartbeatTimeoutInMs; + case SendOffset: + return fSendOffset; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +// Method for setting properties represented as a heartbeat. +void FLPex::SetProperty(const int key, const ptime value, const int slot /*= 0*/) +{ + switch (key) { + case OutputHeartbeat: + fOutputHeartbeat.erase(fOutputHeartbeat.begin() + slot); + fOutputHeartbeat.insert(fOutputHeartbeat.begin() + slot, value); + break; + } +} + +// Method for getting properties represented as a heartbeat. +ptime FLPex::GetProperty(const int key, const ptime default_, const int slot /*= 0*/) +{ + switch (key) { + case OutputHeartbeat: + return fOutputHeartbeat.at(slot); + } +} diff --git a/devices/flp2epn-distributed/FLPex.h b/devices/flp2epn-distributed/FLPex.h new file mode 100644 index 0000000000000..20e7c12333340 --- /dev/null +++ b/devices/flp2epn-distributed/FLPex.h @@ -0,0 +1,59 @@ +/** + * FLPex.h + * + * @since 2014-02-24 + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos + */ + +#ifndef ALICEO2_DEVICES_FLPEX_H_ +#define ALICEO2_DEVICES_FLPEX_H_ + +#include +#include + +#include + +#include "FairMQDevice.h" + +namespace AliceO2 { +namespace Devices { + +class FLPex : public FairMQDevice +{ + public: + enum { + OutputHeartbeat = FairMQDevice::Last, + HeartbeatTimeoutInMs, + NumFLPs, + SendOffset, + Last + }; + + FLPex(); + virtual ~FLPex(); + + virtual void SetProperty(const int key, const std::string& value, const int slot = 0); + virtual std::string GetProperty(const int key, const std::string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + virtual void SetProperty(const int key, const boost::posix_time::ptime value, const int slot = 0); + virtual boost::posix_time::ptime GetProperty(const int key, const boost::posix_time::ptime value, const int slot = 0); + + protected: + virtual void Init(); + virtual void Run(); + + private: + bool updateIPHeartbeat(std::string str); + + int fHeartbeatTimeoutInMs; + int fSendOffset; + std::queue fIdBuffer; + std::queue fDataBuffer; + vector fOutputHeartbeat; +}; + +} // namespace Devices +} // namespace AliceO2 + +#endif diff --git a/devices/flp2epn-distributed/FLPexSampler.cxx b/devices/flp2epn-distributed/FLPexSampler.cxx new file mode 100644 index 0000000000000..e690883e778ca --- /dev/null +++ b/devices/flp2epn-distributed/FLPexSampler.cxx @@ -0,0 +1,150 @@ +/** + * FLPexSampler.cpp + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include +#include + +#include "FLPexSampler.h" +#include "FairMQLogger.h" + +using namespace std; + +using namespace AliceO2::Devices; + +FLPexSampler::FLPexSampler() + : fEventSize(10000) + , fEventRate(1) + , fEventCounter(0) +{ +} + +FLPexSampler::~FLPexSampler() +{ +} + +void FLPexSampler::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); + + // boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + boost::thread resetEventCounter(boost::bind(&FLPexSampler::ResetEventCounter, this)); + + int sent = 0; + unsigned long eventId = 0; + + void* buffer = operator new[](fEventSize); + FairMQMessage* baseMsg = fTransportFactory->CreateMessage(buffer, fEventSize); + + while (fState == RUNNING) { + FairMQMessage* idPart = fTransportFactory->CreateMessage(sizeof(unsigned long)); + memcpy(idPart->GetData(), &eventId, sizeof(unsigned long)); + + fPayloadOutputs->at(0)->Send(idPart, "snd-more"); + ++eventId; + + FairMQMessage* dataPart = fTransportFactory->CreateMessage(); + dataPart->Copy(baseMsg); + + sent = fPayloadOutputs->at(0)->Send(dataPart, "no-block"); + if (sent == 0) { + LOG(ERROR) << "Could not send message with event #" << eventId << " without blocking"; + } + + LOG(INFO) << "Sent event #" << eventId; + if (eventId == ULONG_MAX) { + eventId = 0; + } + + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + --fEventCounter; + + while (fEventCounter == 0) { + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } + + delete idPart; + delete dataPart; + } + + delete baseMsg; + + try { + // rateLogger.interrupt(); + // rateLogger.join(); + resetEventCounter.interrupt(); + resetEventCounter.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +void FLPexSampler::ResetEventCounter() +{ + while (true) { + try { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + break; + } + } +} + +void FLPexSampler::SetProperty(const int key, const string& value, const int slot /*= 0*/) +{ + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string FLPexSampler::GetProperty(const int key, const string& default_ /*= ""*/, const int slot /*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void FLPexSampler::SetProperty(const int key, const int value, const int slot /*= 0*/) +{ + switch (key) { + case EventSize: + fEventSize = value; + break; + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int FLPexSampler::GetProperty(const int key, const int default_ /*= 0*/, const int slot /*= 0*/) +{ + switch (key) { + case EventSize: + return fEventSize; + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} diff --git a/devices/flp2epn-distributed/FLPexSampler.h b/devices/flp2epn-distributed/FLPexSampler.h new file mode 100644 index 0000000000000..a6189ae7ab55c --- /dev/null +++ b/devices/flp2epn-distributed/FLPexSampler.h @@ -0,0 +1,47 @@ +/** + * FLPexSampler.h + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#ifndef ALICEO2_DEVICES_FLPEXSAMPLER_H_ +#define ALICEO2_DEVICES_FLPEXSAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +namespace AliceO2 { +namespace Devices { + +class FLPexSampler : public FairMQDevice +{ + public: + enum { + EventRate = FairMQDevice::Last, + EventSize, + Last + }; + + FLPexSampler(); + virtual ~FLPexSampler(); + + void ResetEventCounter(); + virtual void SetProperty(const int key, const std::string& value, const int slot = 0); + virtual std::string GetProperty(const int key, const std::string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + protected: + virtual void Run(); + + int fEventSize; + int fEventRate; + int fEventCounter; +}; + +} // namespace Devices +} // namespace AliceO2 + +#endif diff --git a/devices/flp2epn-distributed/FrameBuilder.cxx b/devices/flp2epn-distributed/FrameBuilder.cxx new file mode 100644 index 0000000000000..5dd1520f5d71e --- /dev/null +++ b/devices/flp2epn-distributed/FrameBuilder.cxx @@ -0,0 +1,69 @@ +/** + * FrameBuilder.cxx + * + * @since 2014-10-21 + * @author D. Klein, A. Rybalchenko, M. Al-Turany + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQPoller.h" + +#include "FrameBuilder.h" + +using namespace AliceO2::Devices; + +FrameBuilder::FrameBuilder() +{ +} + +void FrameBuilder::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs); + + int received = 0; + int noOfMsgParts = fNumInputs - 1; + + while (fState == RUNNING) { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + + poller->Poll(100); + + for (int i = 0; i < fNumInputs; ++i) { + if (poller->CheckInput(i)) { + received = fPayloadInputs->at(i)->Receive(msg); + if (received > 0) { + if (i < noOfMsgParts) { + fPayloadOutputs->at(0)->Send(msg, "snd-more"); + } else { + fPayloadOutputs->at(0)->Send(msg); + } + } + } + } + + delete msg; + } + + delete poller; + + rateLogger.interrupt(); + rateLogger.join(); + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +FrameBuilder::~FrameBuilder() +{ +} diff --git a/devices/flp2epn-distributed/FrameBuilder.h b/devices/flp2epn-distributed/FrameBuilder.h new file mode 100644 index 0000000000000..111d48b5a5f76 --- /dev/null +++ b/devices/flp2epn-distributed/FrameBuilder.h @@ -0,0 +1,29 @@ +/** + * FrameBuilder.h + * + * @since 2014-10-21 + * @author D. Klein, A. Rybalchenko, M. Al-Turany + */ + +#ifndef ALICEO2_DEVICES_FRAMEBUILDER_H_ +#define ALICEO2_DEVICES_FRAMEBUILDER_H_ + +#include "FairMQDevice.h" + +namespace AliceO2 { +namespace Devices { + +class FrameBuilder : public FairMQDevice +{ + public: + FrameBuilder(); + virtual ~FrameBuilder(); + + protected: + virtual void Run(); +}; + +} // namespace Devices +} // namespace AliceO2 + +#endif diff --git a/devices/flp2epn-distributed/run/runEPN_distributed.cxx b/devices/flp2epn-distributed/run/runEPN_distributed.cxx new file mode 100644 index 0000000000000..647400bf450d2 --- /dev/null +++ b/devices/flp2epn-distributed/run/runEPN_distributed.cxx @@ -0,0 +1,211 @@ +/** + * runEPN_distributed.cxx + * + * @since 2013-01-21 + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos + */ + +#include +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "EPNex.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace std; + +using namespace AliceO2::Devices; + +EPNex epn; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + epn.ChangeState(EPNex::STOP); + epn.ChangeState(EPNex::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +typedef struct DeviceOptions +{ + string id; + int ioThreads; + int numOutputs; + int heartbeatIntervalInMs; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-outputs", bpo::value()->required(), "Number of EPN output sockets") + ("heartbeat-interval", bpo::value()->default_value(5000), "Heartbeat interval in milliseconds") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if (vm.count("help")) { + LOG(INFO) << "EPN" << endl << desc; + return false; + } + + bpo::notify(vm); + + if (vm.count("id")) { + _options->id = vm["id"].as(); + } + + if (vm.count("io-threads")) { + _options->ioThreads = vm["io-threads"].as(); + } + + if (vm.count("num-outputs")) { + _options->numOutputs = vm["num-outputs"].as(); + } + + if (vm.count("heartbeat-interval")) { + _options->heartbeatIntervalInMs = vm["heartbeat-interval"].as(); + } + + if (vm.count("input-socket-type")) { + _options->inputSocketType = vm["input-socket-type"].as(); + } + + if (vm.count("input-buff-size")) { + _options->inputBufSize = vm["input-buff-size"].as(); + } + + if (vm.count("input-method")) { + _options->inputMethod = vm["input-method"].as(); + } + + if (vm.count("input-address")) { + _options->inputAddress = vm["input-address"].as(); + } + + if (vm.count("output-socket-type")) { + _options->outputSocketType = vm["output-socket-type"].as>(); + } + + if (vm.count("output-buff-size")) { + _options->outputBufSize = vm["output-buff-size"].as>(); + } + + if (vm.count("output-method")) { + _options->outputMethod = vm["output-method"].as>(); + } + + if (vm.count("output-address")) { + _options->outputAddress = vm["output-address"].as>(); + } + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + epn.SetTransport(transportFactory); + + epn.SetProperty(EPNex::Id, options.id); + epn.SetProperty(EPNex::NumIoThreads, options.ioThreads); + + epn.SetProperty(EPNex::NumInputs, 1); + epn.SetProperty(EPNex::NumOutputs, options.numOutputs); + epn.SetProperty(EPNex::HeartbeatIntervalInMs, options.heartbeatIntervalInMs); + + epn.ChangeState(EPNex::INIT); + + epn.SetProperty(EPNex::InputSocketType, options.inputSocketType); + epn.SetProperty(EPNex::InputSndBufSize, options.inputBufSize); + epn.SetProperty(EPNex::InputMethod, options.inputMethod); + epn.SetProperty(EPNex::InputAddress, options.inputAddress); + + for (int i = 0; i < options.numOutputs; ++i) + { + epn.SetProperty(EPNex::OutputSocketType, options.outputSocketType.at(i), i); + epn.SetProperty(EPNex::OutputRcvBufSize, options.outputBufSize.at(i), i); + epn.SetProperty(EPNex::OutputMethod, options.outputMethod.at(i), i); + epn.SetProperty(EPNex::OutputAddress, options.outputAddress.at(i), i); + } + + epn.ChangeState(EPNex::SETOUTPUT); + epn.ChangeState(EPNex::SETINPUT); + epn.ChangeState(EPNex::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(epn.fRunningMutex); + while (!epn.fRunningFinished) + { + epn.fRunningCondition.wait(lock); + } + + epn.ChangeState(EPNex::STOP); + epn.ChangeState(EPNex::END); + + return 0; +} diff --git a/devices/flp2epn-distributed/run/runFLPSampler.cxx b/devices/flp2epn-distributed/run/runFLPSampler.cxx new file mode 100644 index 0000000000000..60a70cfc66ebf --- /dev/null +++ b/devices/flp2epn-distributed/run/runFLPSampler.cxx @@ -0,0 +1,180 @@ +/** + * runFLPSampler.cxx + * + * @since 2013-01-21 + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos + */ + +#include +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FLPexSampler.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace std; + +using namespace AliceO2::Devices; + +FLPexSampler sampler; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + sampler.ChangeState(FLPexSampler::STOP); + sampler.ChangeState(FLPexSampler::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +typedef struct DeviceOptions +{ + string id; + int eventSize; + int eventRate; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("event-rate", bpo::value()->default_value(0), "Event rate limit in maximum number of events per second") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if (vm.count("help")) { + LOG(INFO) << "Test FLP Sampler" << endl << desc; + return false; + } + + bpo::notify(vm); + + if (vm.count("id")) { + _options->id = vm["id"].as(); + } + + if (vm.count("event-size")) { + _options->eventSize = vm["event-size"].as(); + } + + if (vm.count("event-rate")) { + _options->eventRate = vm["event-rate"].as(); + } + + if (vm.count("io-threads")) { + _options->ioThreads = vm["io-threads"].as(); + } + + if (vm.count("output-socket-type")) { + _options->outputSocketType = vm["output-socket-type"].as(); + } + + if (vm.count("output-buff-size")) { + _options->outputBufSize = vm["output-buff-size"].as(); + } + + if (vm.count("output-method")) { + _options->outputMethod = vm["output-method"].as(); + } + + if (vm.count("output-address")) { + _options->outputAddress = vm["output-address"].as(); + } + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sampler.SetTransport(transportFactory); + + sampler.SetProperty(FLPexSampler::Id, options.id); + sampler.SetProperty(FLPexSampler::NumIoThreads, options.ioThreads); + sampler.SetProperty(FLPexSampler::EventSize, options.eventSize); + sampler.SetProperty(FLPexSampler::EventRate, options.eventRate); + + sampler.SetProperty(FLPexSampler::NumInputs, 0); + sampler.SetProperty(FLPexSampler::NumOutputs, 1); + + sampler.ChangeState(FLPexSampler::INIT); + + sampler.SetProperty(FLPexSampler::OutputSocketType, options.outputSocketType); + sampler.SetProperty(FLPexSampler::OutputRcvBufSize, options.outputBufSize); + sampler.SetProperty(FLPexSampler::OutputMethod, options.outputMethod); + sampler.SetProperty(FLPexSampler::OutputAddress, options.outputAddress); + + sampler.ChangeState(FLPexSampler::SETOUTPUT); + sampler.ChangeState(FLPexSampler::SETINPUT); + sampler.ChangeState(FLPexSampler::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(sampler.fRunningMutex); + while (!sampler.fRunningFinished) + { + sampler.fRunningCondition.wait(lock); + } + + sampler.ChangeState(FLPexSampler::STOP); + sampler.ChangeState(FLPexSampler::END); + + return 0; +} diff --git a/devices/flp2epn-distributed/run/runFLP_distributed.cxx b/devices/flp2epn-distributed/run/runFLP_distributed.cxx new file mode 100644 index 0000000000000..e701216d8811d --- /dev/null +++ b/devices/flp2epn-distributed/run/runFLP_distributed.cxx @@ -0,0 +1,224 @@ +/** + * runFLP_distributed.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos + */ + +#include +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FLPex.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace std; + +using namespace AliceO2::Devices; + +FLPex flp; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + flp.ChangeState(FLPex::STOP); + flp.ChangeState(FLPex::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +typedef struct DeviceOptions +{ + string id; + int ioThreads; + int numInputs; + int numOutputs; + int heartbeatTimeoutInMs; + int sendOffset; + vector inputSocketType; + vector inputBufSize; + vector inputMethod; + vector inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-inputs", bpo::value()->required(), "Number of FLP input sockets") + ("num-outputs", bpo::value()->required(), "Number of FLP output sockets") + ("heartbeat-timeout", bpo::value()->default_value(20000), "Heartbeat timeout in milliseconds") + ("send-offset", bpo::value()->default_value(0), "Offset for staggered sending") + ("input-socket-type", bpo::value< vector >()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value< vector >()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value< vector >()->required(), "Input method: bind/connect") + ("input-address", bpo::value< vector >()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if (vm.count("help")) { + LOG(INFO) << "FLP" << endl << desc; + return false; + } + + bpo::notify(vm); + + if (vm.count("id")) { + _options->id = vm["id"].as(); + } + + if (vm.count("io-threads")) { + _options->ioThreads = vm["io-threads"].as(); + } + + if (vm.count("num-inputs")) { + _options->numInputs = vm["num-inputs"].as(); + } + + if (vm.count("num-outputs")) { + _options->numOutputs = vm["num-outputs"].as(); + } + + if (vm.count("heartbeat-timeout")) { + _options->heartbeatTimeoutInMs = vm["heartbeat-timeout"].as(); + } + + if (vm.count("send-offset")) { + _options->sendOffset = vm["send-offset"].as(); + } + + if (vm.count("input-socket-type")) { + _options->inputSocketType = vm["input-socket-type"].as>(); + } + + if (vm.count("input-buff-size")) { + _options->inputBufSize = vm["input-buff-size"].as>(); + } + + if (vm.count("input-method")) { + _options->inputMethod = vm["input-method"].as>(); + } + + if (vm.count("input-address")) { + _options->inputAddress = vm["input-address"].as>(); + } + + if (vm.count("output-socket-type")) { + _options->outputSocketType = vm["output-socket-type"].as>(); + } + + if (vm.count("output-buff-size")) { + _options->outputBufSize = vm["output-buff-size"].as>(); + } + + if (vm.count("output-method")) { + _options->outputMethod = vm["output-method"].as>(); + } + + if (vm.count("output-address")) { + _options->outputAddress = vm["output-address"].as>(); + } + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + flp.SetTransport(transportFactory); + + flp.SetProperty(FLPex::Id, options.id); + flp.SetProperty(FLPex::NumIoThreads, options.ioThreads); + + flp.SetProperty(FLPex::NumInputs, options.numInputs); + flp.SetProperty(FLPex::NumOutputs, options.numOutputs); + flp.SetProperty(FLPex::HeartbeatTimeoutInMs, options.heartbeatTimeoutInMs); + flp.SetProperty(FLPex::SendOffset, options.sendOffset); + + flp.ChangeState(FLPex::INIT); + + for (int i = 0; i < options.numInputs; ++i) { + flp.SetProperty(FLPex::InputSocketType, options.inputSocketType.at(i), i); + flp.SetProperty(FLPex::InputRcvBufSize, options.inputBufSize.at(i), i); + flp.SetProperty(FLPex::InputMethod, options.inputMethod.at(i), i); + flp.SetProperty(FLPex::InputAddress, options.inputAddress.at(i), i); + } + + for (int i = 0; i < options.numOutputs; ++i) { + flp.SetProperty(FLPex::OutputSocketType, options.outputSocketType.at(i), i); + flp.SetProperty(FLPex::OutputRcvBufSize, options.outputBufSize.at(i), i); + flp.SetProperty(FLPex::OutputMethod, options.outputMethod.at(i), i); + flp.SetProperty(FLPex::OutputAddress, options.outputAddress.at(i), i); + } + + flp.ChangeState(FLPex::SETOUTPUT); + flp.ChangeState(FLPex::SETINPUT); + flp.ChangeState(FLPex::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(flp.fRunningMutex); + while (!flp.fRunningFinished) { + flp.fRunningCondition.wait(lock); + } + + flp.ChangeState(FLPex::STOP); + flp.ChangeState(FLPex::END); + + return 0; +} diff --git a/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in b/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in new file mode 100755 index 0000000000000..8d9aeaf91384d --- /dev/null +++ b/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in @@ -0,0 +1,83 @@ +#!/bin/bash + +buffSize="10" # zeromq high-water mark is in messages +#buffSize="50000000" # nanomsg buffer size is in bytes + +SAMPLER="testFLPSampler" +SAMPLER+=" --id 101" +SAMPLER+=" --event-size 1000000" +SAMPLER+=" --event-rate 100" +SAMPLER+=" --output-socket-type pub --output-buff-size $buffSize --output-method bind --output-address tcp://*:5550" +xterm -geometry 80x25+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +FLP0="testFLP_distributed" +FLP0+=" --id FLP0" +FLP0+=" --num-inputs 3" +FLP0+=" --num-outputs 3" +FLP0+=" --heartbeat-timeout 20000" +FLP0+=" --send-offset 0" +FLP0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://*:5600" # command +FLP0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5580" # heartbeat +FLP0+=" --input-socket-type sub --input-buff-size $buffSize --input-method connect --input-address tcp://127.0.0.1:5550" # data +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -geometry 80x25+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$FLP0 & + +FLP1="testFLP_distributed" +FLP1+=" --id FLP1" +FLP1+=" --num-inputs 3" +FLP1+=" --num-outputs 3" +FLP1+=" --heartbeat-timeout 20000" +FLP1+=" --send-offset 1" +FLP1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://*:5601" # command +FLP1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5581" # heartbeat +FLP1+=" --input-socket-type sub --input-buff-size $buffSize --input-method connect --input-address tcp://127.0.0.1:5550" # data +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -geometry 80x25+500+350 -hold -e @CMAKE_BINARY_DIR@/bin/$FLP1 & + +FLP2="testFLP_distributed" +FLP2+=" --id FLP2" +FLP2+=" --num-inputs 3" +FLP2+=" --num-outputs 3" +FLP2+=" --heartbeat-timeout 20000" +FLP2+=" --send-offset 2" +FLP2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://*:5602" # command +FLP2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5582" # heartbeat +FLP2+=" --input-socket-type sub --input-buff-size $buffSize --input-method connect --input-address tcp://127.0.0.1:5550" # data +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -geometry 80x25+500+700 -hold -e @CMAKE_BINARY_DIR@/bin/$FLP2 & + +EPN0="testEPN_distributed" +EPN0+=" --id EPN0" +EPN0+=" --num-outputs 3" +EPN0+=" --heartbeat-interval 5000" +EPN0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5560" # data +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -geometry 80x25+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/$EPN0 & + +EPN1="testEPN_distributed" +EPN1+=" --id EPN1" +EPN1+=" --num-outputs 3" +EPN1+=" --heartbeat-interval 5000" +EPN1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5561" # data +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -geometry 80x25+1000+350 -hold -e @CMAKE_BINARY_DIR@/bin/$EPN1 & + +EPN2="testEPN_distributed" +EPN2+=" --id EPN2" +EPN2+=" --num-outputs 3" +EPN2+=" --heartbeat-interval 5000" +EPN2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5562" # data +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -geometry 80x25+1000+700 -hold -e @CMAKE_BINARY_DIR@/bin/$EPN2 & diff --git a/devices/flp2epn-dynamic/CMakeLists.txt b/devices/flp2epn-dynamic/CMakeLists.txt index 73adfeeacb6fe..5a62c7c67e007 100644 --- a/devices/flp2epn-dynamic/CMakeLists.txt +++ b/devices/flp2epn-dynamic/CMakeLists.txt @@ -23,7 +23,7 @@ set(SRCS set(DEPENDENCIES ${DEPENDENCIES} ${CMAKE_THREAD_LIBS_INIT} - boost_date_time boost_thread boost_timer boost_system FairMQ + boost_date_time boost_thread boost_timer boost_system boost_program_options FairMQ ) set(LIBRARY_NAME FLP2EPNex_dynamic) diff --git a/devices/flp2epn-dynamic/O2EPNex.cxx b/devices/flp2epn-dynamic/O2EPNex.cxx index 6dcdc147c1c90..5bc32205f9ac6 100644 --- a/devices/flp2epn-dynamic/O2EPNex.cxx +++ b/devices/flp2epn-dynamic/O2EPNex.cxx @@ -16,44 +16,48 @@ O2EPNex::O2EPNex() : { } +O2EPNex::~O2EPNex() +{ +} + void O2EPNex::Run() { LOG(INFO) << ">>>>>>> Run <<<<<<<"; boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - + boost::posix_time::ptime referenceTime = boost::posix_time::microsec_clock::local_time(); - - //Set the time difference to fHeartbeatIntervalInMs to immediately send a heartbeat to the EPNs + + // Set the time difference to fHeartbeatIntervalInMs to immediately send a heartbeat to the EPNs int timeDif = fHeartbeatIntervalInMs; - + while (fState == RUNNING) { if (timeDif >= fHeartbeatIntervalInMs) { referenceTime = boost::posix_time::microsec_clock::local_time(); - for (int iOutput = 0; iOutput < fNumOutputs; iOutput++) { + for (int i = 0; i < fNumOutputs; i++) { FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage (strlen (fInputAddress.at(0).c_str())); memcpy(heartbeatMsg->GetData(), fInputAddress.at(0).c_str(), strlen (fInputAddress.at(0).c_str())); - - fPayloadOutputs->at(iOutput)->Send(heartbeatMsg); - + + fPayloadOutputs->at(i)->Send(heartbeatMsg); + delete heartbeatMsg; } } - - //Update the time difference + + // Update the time difference timeDif = (boost::posix_time::microsec_clock::local_time() - referenceTime).total_milliseconds(); - //Receive payload + // Receive payload FairMQMessage* payloadMsg = fTransportFactory->CreateMessage(); size_t payloadSize = fPayloadInputs->at(0)->Receive(payloadMsg, "no-block"); - + if ( payloadSize > 0 ) { int inputSize = payloadMsg->GetSize(); int numInput = inputSize / sizeof(Content); Content* input = reinterpret_cast(payloadMsg->GetData()); - + // for (int i = 0; i < numInput; ++i) { // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b; // } @@ -73,6 +77,41 @@ void O2EPNex::Run() fRunningCondition.notify_one(); } -O2EPNex::~O2EPNex() +void O2EPNex::SetProperty(const int key, const string& value, const int slot/*= 0*/) { + switch (key) { + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +string O2EPNex::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) { + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +void O2EPNex::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatIntervalInMs: + fHeartbeatIntervalInMs = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +int O2EPNex::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) { + case HeartbeatIntervalInMs: + return fHeartbeatIntervalInMs; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } } diff --git a/devices/flp2epn-dynamic/O2EPNex.h b/devices/flp2epn-dynamic/O2EPNex.h index df86d1fb78219..09f8d21b5bac9 100644 --- a/devices/flp2epn-dynamic/O2EPNex.h +++ b/devices/flp2epn-dynamic/O2EPNex.h @@ -8,9 +8,12 @@ #ifndef O2EPNEX_H_ #define O2EPNEX_H_ +#include + #include "FairMQDevice.h" struct Content { + int id; double a; double b; int x; @@ -22,14 +25,20 @@ class O2EPNex: public FairMQDevice { public: enum { - HeartbeatIntervalInMs + HeartbeatIntervalInMs = FairMQDevice::Last, + Last }; O2EPNex(); virtual ~O2EPNex(); - - int fHeartbeatIntervalInMs; + + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); protected: + int fHeartbeatIntervalInMs; + virtual void Run(); }; diff --git a/devices/flp2epn-dynamic/O2FLPex.cxx b/devices/flp2epn-dynamic/O2FLPex.cxx index 92e4245720102..9509f6cd25cbf 100644 --- a/devices/flp2epn-dynamic/O2FLPex.cxx +++ b/devices/flp2epn-dynamic/O2FLPex.cxx @@ -31,9 +31,9 @@ O2FLPex::~O2FLPex() void O2FLPex::Init() { FairMQDevice::Init(); - + boost::posix_time::ptime nullTime; - + for (int i = 0; i < fNumOutputs; ++i) { fOutputHeartbeat.push_back(nullTime); } @@ -41,21 +41,21 @@ void O2FLPex::Init() bool O2FLPex::updateIPHeartbeat (string str) { - for (int iOutput = 0; iOutput < fNumOutputs; iOutput++) { - if ( GetProperty (OutputAddress, "", iOutput) == str ) { + for (int i = 0; i < fNumOutputs; i++) { + if ( GetProperty (OutputAddress, "", i) == str ) { boost::posix_time::ptime currentHeartbeat = boost::posix_time::microsec_clock::local_time(); - boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, iOutput); - - //if ( to_simple_string (storedHeartbeat) != "not-a-date-time" ) { - // LOG(INFO) << "EPN " << iOutput << " last seen " - // << (currentHeartbeat - storedHeartbeat).total_milliseconds() - // << " ms ago. Updating heartbeat..."; - //} - //else { - // LOG(INFO) << "IP has no heartbeat associated. Adding heartbeat: " << currentHeartbeat; - //} - - SetProperty (OutputHeartbeat, currentHeartbeat, iOutput); + boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, i); + + if ( to_simple_string (storedHeartbeat) != "not-a-date-time" ) { + LOG(INFO) << "EPN " << i << " (" << str << ")" << " last seen " + << (currentHeartbeat - storedHeartbeat).total_milliseconds() + << " ms ago. Updating heartbeat..."; + } + else { + LOG(INFO) << "IP has no heartbeat associated. Adding heartbeat: " << currentHeartbeat; + } + + SetProperty (OutputHeartbeat, currentHeartbeat, i); return true; } @@ -74,10 +74,10 @@ void O2FLPex::Run() srand(time(NULL)); stringstream ss(fId); - + int Flp_id; ss >> Flp_id; - + Content* payload = new Content[fEventSize]; for (int i = 0; i < fEventSize; ++i) { (&payload[i])->id = Flp_id; @@ -86,47 +86,46 @@ void O2FLPex::Run() (&payload[i])->z = rand() % 100 + 1; (&payload[i])->a = (rand() % 100 + 1) / (rand() % 100 + 1); (&payload[i])->b = (rand() % 100 + 1) / (rand() % 100 + 1); - //LOG(INFO) << (&payload[i])->id << " " << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; + // LOG(INFO) << (&payload[i])->id << " " << (&payload[i])->x << " " << (&payload[i])->y << " " << (&payload[i])->z << " " << (&payload[i])->a << " " << (&payload[i])->b; } delete[] payload; while ( fState == RUNNING ) { - //Receive heartbeat + // Receive heartbeat FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(); size_t heartbeatSize = fPayloadInputs->at(0)->Receive(heartbeatMsg, "no-block"); - + if ( heartbeatSize > 0 ) { std::string rpl = std::string (static_cast(heartbeatMsg->GetData()), heartbeatMsg->GetSize()); updateIPHeartbeat (rpl); } - + delete heartbeatMsg; - - //Send payload - for (int iOutput = 0; iOutput < fNumOutputs; iOutput++) { + + // Send payload + for (int i = 0; i < fNumOutputs; i++) { boost::posix_time::ptime currentHeartbeat = boost::posix_time::microsec_clock::local_time(); - boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, iOutput); + boost::posix_time::ptime storedHeartbeat = GetProperty (OutputHeartbeat, storedHeartbeat, i); if ( to_simple_string (storedHeartbeat) == "not-a-date-time" || (currentHeartbeat - storedHeartbeat).total_milliseconds() > fHeartbeatTimeoutInMs) { - //LOG(INFO) << "EPN " << iOutput << " has not send a heartbeat, or heartbeat too old"; + // LOG(INFO) << "EPN " << i << " has not send a heartbeat, or heartbeat too old"; continue; } - //LOG(INFO) << "Pubishing payload to EPN " << iOutput; + // LOG(INFO) << "Pubishing payload to EPN " << i; FairMQMessage* payloadMsg = fTransportFactory->CreateMessage(fEventSize * sizeof(Content)); memcpy(payloadMsg->GetData(), payload, fEventSize * sizeof(Content)); - fPayloadOutputs->at(iOutput)->Send(payloadMsg); + fPayloadOutputs->at(i)->Send(payloadMsg); delete payloadMsg; } } - - rateLogger.interrupt(); + rateLogger.interrupt(); rateLogger.join(); FairMQDevice::Shutdown(); @@ -137,81 +136,47 @@ void O2FLPex::Run() fRunningCondition.notify_one(); } -void O2FLPex::Log(int intervalInMs) -{ - timestamp_t t0; - timestamp_t t1; - unsigned long bytes = fPayloadOutputs->at(0)->GetBytesTx(); - unsigned long messages = fPayloadOutputs->at(0)->GetMessagesTx(); - unsigned long bytesNew = 0; - unsigned long messagesNew = 0; - double megabytesPerSecond = 0; - double messagesPerSecond = 0; - - t0 = get_timestamp(); - - while (true) { - boost::this_thread::sleep(boost::posix_time::milliseconds(intervalInMs)); - - t1 = get_timestamp(); - - bytesNew = fPayloadOutputs->at(0)->GetBytesTx(); - messagesNew = fPayloadOutputs->at(0)->GetMessagesTx(); - - timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L; - - megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.; - messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.; - - LOG(DEBUG) << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s"; - - bytes = bytesNew; - messages = messagesNew; - t0 = t1; - } -} - void O2FLPex::SetProperty(const int key, const string& value, const int slot/*= 0*/) { switch (key) { - default: - FairMQDevice::SetProperty(key, value, slot); - break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; } } string O2FLPex::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) { switch (key) { - default: - return FairMQDevice::GetProperty(key, default_, slot); + default: + return FairMQDevice::GetProperty(key, default_, slot); } } void O2FLPex::SetProperty(const int key, const int value, const int slot/*= 0*/) { switch (key) { - case EventSize: - fEventSize = value; - break; - case HeartbeatTimeoutInMs: - fHeartbeatTimeoutInMs = value; - break; - default: - FairMQDevice::SetProperty(key, value, slot); - break; + case EventSize: + fEventSize = value; + break; + case HeartbeatTimeoutInMs: + fHeartbeatTimeoutInMs = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; } } int O2FLPex::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) { switch (key) { - case EventSize: - return fEventSize; - case HeartbeatTimeoutInMs: - return fHeartbeatTimeoutInMs; - default: - return FairMQDevice::GetProperty(key, default_, slot); + case EventSize: + return fEventSize; + case HeartbeatTimeoutInMs: + return fHeartbeatTimeoutInMs; + default: + return FairMQDevice::GetProperty(key, default_, slot); } } diff --git a/devices/flp2epn-dynamic/O2FLPex.h b/devices/flp2epn-dynamic/O2FLPex.h index ef01ee5677c43..b99f0800f670e 100644 --- a/devices/flp2epn-dynamic/O2FLPex.h +++ b/devices/flp2epn-dynamic/O2FLPex.h @@ -27,13 +27,12 @@ class O2FLPex: public FairMQDevice enum { InputFile = FairMQDevice::Last, EventSize, - Last, OutputHeartbeat, - HeartbeatTimeoutInMs + HeartbeatTimeoutInMs, + Last }; O2FLPex(); virtual ~O2FLPex(); - void Log(int intervalInMs); virtual void SetProperty(const int key, const string& value, const int slot = 0); virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); @@ -44,12 +43,11 @@ class O2FLPex: public FairMQDevice protected: int fEventSize; + int fHeartbeatTimeoutInMs; virtual void Init(); virtual void Run(); - - int fHeartbeatTimeoutInMs; - + private: vector fOutputHeartbeat; bool updateIPHeartbeat (string str); diff --git a/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx b/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx index 237aabf534399..8011f28fb5388 100644 --- a/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx +++ b/devices/flp2epn-dynamic/run/runEPN_dynamic.cxx @@ -1,134 +1,198 @@ /** - * runEPN.cxx + * runEPN_dynamic.cxx * * @since 2013-01-21 - * @author D. Klein, A. Rybalchenko, M.Al-Turany + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos */ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2EPNex.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; O2EPNex epn; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - epn.ChangeState(O2EPNex::STOP); - epn.ChangeState(O2EPNex::END); + epn.ChangeState(O2EPNex::STOP); + epn.ChangeState(O2EPNex::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if ( argc < 13 ) { - cout << "Usage: testEPN \tID numIoTreads numOutputs heartbeatIntervalInMs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t...\n" - << endl; - return 1; - } + string id; + int ioThreads; + int numOutputs; + int heartbeatIntervalInMs; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-outputs", bpo::value()->required(), "Number of EPN output sockets") + ("heartbeat-interval", bpo::value()->default_value(5000), "Heartbeat interval in milliseconds") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "EPN" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-outputs") ) + _options->numOutputs = vm["num-outputs"].as(); + + if ( vm.count("heartbeat-interval") ) + _options->heartbeatIntervalInMs = vm["heartbeat-interval"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); - s_catch_signals(); + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as< vector >(); - LOG(INFO) << "PID: " << getpid(); + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as< vector >(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as< vector >(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as< vector >(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); #else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); #endif - epn.SetTransport(transportFactory); - - int i = 1; - - epn.SetProperty(O2EPNex::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - epn.SetProperty(O2EPNex::NumIoThreads, numIoThreads); - ++i; - - epn.SetProperty(O2EPNex::NumInputs, 1); - - int numOutputs; - stringstream(argv[i]) >> numOutputs; - epn.SetProperty(O2EPNex::NumOutputs, numOutputs); - ++i; - - int heartbeatIntervalInMs; - stringstream(argv[i]) >> heartbeatIntervalInMs; - epn.fHeartbeatIntervalInMs = heartbeatIntervalInMs; - ++i; - - epn.ChangeState(O2EPNex::INIT); - - epn.SetProperty(O2EPNex::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - epn.SetProperty(O2EPNex::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - epn.SetProperty(O2EPNex::InputMethod, argv[i], 0); - ++i; - epn.SetProperty(O2EPNex::InputAddress, argv[i], 0); - ++i; - - for (int iOutput = 0; iOutput < numOutputs; iOutput++ ) { - epn.SetProperty(O2EPNex::OutputSocketType, argv[i], iOutput); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - epn.SetProperty(O2EPNex::OutputSndBufSize, outputSndBufSize, iOutput); - ++i; - epn.SetProperty(O2EPNex::OutputMethod, argv[i], iOutput); - ++i; - epn.SetProperty(O2EPNex::OutputAddress, argv[i], iOutput); - ++i; - } - - epn.ChangeState(O2EPNex::SETOUTPUT); - epn.ChangeState(O2EPNex::SETINPUT); - epn.ChangeState(O2EPNex::RUN); - - // wait until the running thread has finished processing. - boost::unique_lock lock(epn.fRunningMutex); - while (!epn.fRunningFinished) - { + epn.SetTransport(transportFactory); + + epn.SetProperty(O2EPNex::Id, options.id); + epn.SetProperty(O2EPNex::NumIoThreads, options.ioThreads); + + epn.SetProperty(O2EPNex::NumInputs, 1); + epn.SetProperty(O2EPNex::NumOutputs, options.numOutputs); + epn.SetProperty(O2EPNex::HeartbeatIntervalInMs, options.heartbeatIntervalInMs); + + epn.ChangeState(O2EPNex::INIT); + + epn.SetProperty(O2EPNex::InputSocketType, options.inputSocketType); + epn.SetProperty(O2EPNex::InputSndBufSize, options.inputBufSize); + epn.SetProperty(O2EPNex::InputMethod, options.inputMethod); + epn.SetProperty(O2EPNex::InputAddress, options.inputAddress); + + for (int i = 0; i < options.numOutputs; ++i) + { + epn.SetProperty(O2EPNex::OutputSocketType, options.outputSocketType.at(i), i); + epn.SetProperty(O2EPNex::OutputRcvBufSize, options.outputBufSize.at(i), i); + epn.SetProperty(O2EPNex::OutputMethod, options.outputMethod.at(i), i); + epn.SetProperty(O2EPNex::OutputAddress, options.outputAddress.at(i), i); + } + + epn.ChangeState(O2EPNex::SETOUTPUT); + epn.ChangeState(O2EPNex::SETINPUT); + epn.ChangeState(O2EPNex::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(epn.fRunningMutex); + while (!epn.fRunningFinished) + { epn.fRunningCondition.wait(lock); - } + } - epn.ChangeState(O2EPNex::STOP); - epn.ChangeState(O2EPNex::END); + epn.ChangeState(O2EPNex::STOP); + epn.ChangeState(O2EPNex::END); - return 0; + return 0; } diff --git a/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx b/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx index 80366714b0d35..9013a0cde4ab1 100644 --- a/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx +++ b/devices/flp2epn-dynamic/run/runFLP_dynamic.cxx @@ -1,142 +1,204 @@ /** - * runBenchmarkflp.cxx + * runFLP_dynamic.cxx * * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko + * @author D. Klein, A. Rybalchenko, M. Al-Turany, C. Kouzinopoulos */ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2FLPex.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - +using namespace std; O2FLPex flp; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - flp.ChangeState(O2FLPex::STOP); - flp.ChangeState(O2FLPex::END); + flp.ChangeState(O2FLPex::STOP); + flp.ChangeState(O2FLPex::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - LOG(INFO) << "-> " << argc; - - if ( argc < 14 ) { - cout << "Usage: testFLP ID eventSize numIoTreads numOutputs heartbeatTimeoutInMs\n" - << "\t\tinputSocketType inputSndBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << "\t\t...\n" - << endl; - return 1; - } + string id; + int eventSize; + int ioThreads; + int numOutputs; + int heartbeatTimeoutInMs; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + vector outputSocketType; + vector outputBufSize; + vector outputMethod; + vector outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-outputs", bpo::value()->required(), "Number of FLP output sockets") + ("heartbeat-timeout", bpo::value()->default_value(20000), "Heartbeat timeout in milliseconds") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value< vector >()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value< vector >()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value< vector >()->required(), "Output method: bind/connect") + ("output-address", bpo::value< vector >()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "FLP" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-outputs") ) + _options->numOutputs = vm["num-outputs"].as(); + + if ( vm.count("heartbeat-timeout") ) + _options->heartbeatTimeoutInMs = vm["heartbeat-timeout"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); - s_catch_signals(); + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as< vector >(); - LOG(INFO) << "PID: " << getpid(); + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as< vector >(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as< vector >(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as< vector >(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); #else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); #endif - flp.SetTransport(transportFactory); - - int i = 1; - - flp.SetProperty(O2FLPex::Id, argv[i]); - ++i; - - int eventSize; - stringstream(argv[i]) >> eventSize; - flp.SetProperty(O2FLPex::EventSize, eventSize); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - flp.SetProperty(O2FLPex::NumIoThreads, numIoThreads); - ++i; - - flp.SetProperty(O2FLPex::NumInputs, 1); - - int numOutputs; - stringstream(argv[i]) >> numOutputs; - flp.SetProperty(O2FLPex::NumOutputs, numOutputs); - ++i; - - int heartbeatTimeoutInMs; - stringstream(argv[i]) >> heartbeatTimeoutInMs; - flp.SetProperty(O2FLPex::HeartbeatTimeoutInMs, heartbeatTimeoutInMs); - ++i; - - flp.ChangeState(O2FLPex::INIT); - - flp.SetProperty(O2FLPex::InputSocketType, argv[i], 0); - ++i; - int inputSndBufSize; - stringstream(argv[i]) >> inputSndBufSize; - flp.SetProperty(O2FLPex::InputSndBufSize, inputSndBufSize, 0); - ++i; - flp.SetProperty(O2FLPex::InputMethod, argv[i], 0); - ++i; - flp.SetProperty(O2FLPex::InputAddress, argv[i], 0); - ++i; - - for (int iOutput = 0; iOutput < numOutputs; iOutput++ ) { - flp.SetProperty(O2FLPex::OutputSocketType, argv[i], iOutput); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - flp.SetProperty(O2FLPex::OutputSndBufSize, outputSndBufSize, iOutput); - ++i; - flp.SetProperty(O2FLPex::OutputMethod, argv[i], iOutput); - ++i; - flp.SetProperty(O2FLPex::OutputAddress, argv[i], iOutput); - ++i; - } - - flp.ChangeState(O2FLPex::SETOUTPUT); - flp.ChangeState(O2FLPex::SETINPUT); - flp.ChangeState(O2FLPex::RUN); - - // wait until the running thread has finished processing. - boost::unique_lock lock(flp.fRunningMutex); - while (!flp.fRunningFinished) - { - flp.fRunningCondition.wait(lock); - } - - flp.ChangeState(O2FLPex::STOP); - flp.ChangeState(O2FLPex::END); - - return 0; + flp.SetTransport(transportFactory); + + flp.SetProperty(O2FLPex::Id, options.id); + flp.SetProperty(O2FLPex::NumIoThreads, options.ioThreads); + flp.SetProperty(O2FLPex::EventSize, options.eventSize); + + flp.SetProperty(O2FLPex::NumInputs, 1); + flp.SetProperty(O2FLPex::NumOutputs, options.numOutputs); + flp.SetProperty(O2FLPex::HeartbeatTimeoutInMs, options.heartbeatTimeoutInMs); + + flp.ChangeState(O2FLPex::INIT); + + flp.SetProperty(O2FLPex::InputSocketType, options.inputSocketType); + flp.SetProperty(O2FLPex::InputSndBufSize, options.inputBufSize); + flp.SetProperty(O2FLPex::InputMethod, options.inputMethod); + flp.SetProperty(O2FLPex::InputAddress, options.inputAddress); + + for (int i = 0; i < options.numOutputs; ++i) + { + flp.SetProperty(O2FLPex::OutputSocketType, options.outputSocketType.at(i), i); + flp.SetProperty(O2FLPex::OutputRcvBufSize, options.outputBufSize.at(i), i); + flp.SetProperty(O2FLPex::OutputMethod, options.outputMethod.at(i), i); + flp.SetProperty(O2FLPex::OutputAddress, options.outputAddress.at(i), i); + } + + flp.ChangeState(O2FLPex::SETOUTPUT); + flp.ChangeState(O2FLPex::SETINPUT); + flp.ChangeState(O2FLPex::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(flp.fRunningMutex); + while (!flp.fRunningFinished) + { + flp.fRunningCondition.wait(lock); + } + + flp.ChangeState(O2FLPex::STOP); + flp.ChangeState(O2FLPex::END); + + return 0; } diff --git a/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in b/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in index 51eae230c5d69..67a339f89862c 100755 --- a/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in +++ b/devices/flp2epn-dynamic/run/startFLP2EPN-dynamic.sh.in @@ -1,142 +1,67 @@ #!/bin/bash -if(@NANOMSG_FOUND@); then - buffSize="50000000" # nanomsg buffer size is in bytes -else - buffSize="1000" # zeromq high-water mark is in messages -fi +buffSize="1000" # zeromq high-water mark is in messages +#buffSize="50000000" # nanomsg buffer size is in bytes -ID="101" -eventSize="1000" -numIoThreads="1" -numOutputs="3" -heartbeatTimeoutInMs="20000" -inputSocketType="sub" -inputBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5580" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5560" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5561" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP_dynamic $ID $eventSize $numIoThreads $numOutputs $heartbeatTimeoutInMs $inputSocketType $inputBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +FLP0="testFLP_dynamic" +FLP0+=" --id 0" +FLP0+=" --event-size 1000" +FLP0+=" --num-outputs 3" +FLP0+=" --heartbeat-timeout 20000" +FLP0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5580" +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP0 & -ID="102" -eventSize="1000" -numIoThreads="1" -numOutputs="3" -heartbeatTimeoutInMs="20000" -inputSocketType="sub" -inputBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5581" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5560" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5561" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP_dynamic $ID $eventSize $numIoThreads $numOutputs $heartbeatTimeoutInMs $inputSocketType $inputBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +FLP1="testFLP_dynamic" +FLP1+=" --id 1" +FLP1+=" --event-size 1000" +FLP1+=" --num-outputs 3" +FLP1+=" --heartbeat-timeout 20000" +FLP1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5581" +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP1 & -ID="103" -eventSize="1000" -numIoThreads="1" -numOutputs="3" -heartbeatTimeoutInMs="20000" -inputSocketType="sub" -inputBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5582" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5560" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5561" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5562" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP_dynamic $ID $eventSize $numIoThreads $numOutputs $heartbeatTimeoutInMs $inputSocketType $inputBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +FLP2="testFLP_dynamic" +FLP2+=" --id 2" +FLP2+=" --event-size 1000" +FLP2+=" --num-outputs 3" +FLP2+=" --heartbeat-timeout 20000" +FLP2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5582" +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5560" +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5561" +FLP2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5562" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP2 & -ID="201" -numIoThreads="1" -numOutputs="3" -heartbeatIntervalInMs="5000" -inputSocketType="sub" -inputRcvBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5560" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5580" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5581" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN_dynamic $ID $numIoThreads $numOutputs $heartbeatIntervalInMs $inputSocketType $inputRcvBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +EPN0="testEPN_dynamic" +EPN0+=" --id EPN0" +EPN0+=" --num-outputs 3" +EPN0+=" --heartbeat-interval 5000" +EPN0+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5560" +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN0+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN0 & -ID="202" -numIoThreads="1" -numOutputs="3" -heartbeatIntervalInMs="5000" -inputSocketType="sub" -inputRcvBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5561" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5580" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5581" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN_dynamic $ID $numIoThreads $numOutputs $heartbeatIntervalInMs $inputSocketType $inputRcvBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +EPN1="testEPN_dynamic" +EPN1+=" --id EPN1" +EPN1+=" --num-outputs 3" +EPN1+=" --heartbeat-interval 5000" +EPN1+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5561" +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN1+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN1 & -ID="203" -numIoThreads="1" -numOutputs="3" -heartbeatIntervalInMs="5000" -inputSocketType="sub" -inputRcvBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://127.0.0.1:5562" -outputSocketType1="pub" -outputBufSize1=$buffSize -outputMethod1="connect" -outputAddress1="tcp://127.0.0.1:5580" -outputSocketType2="pub" -outputBufSize2=$buffSize -outputMethod2="connect" -outputAddress2="tcp://127.0.0.1:5581" -outputSocketType3="pub" -outputBufSize3=$buffSize -outputMethod3="connect" -outputAddress3="tcp://127.0.0.1:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN_dynamic $ID $numIoThreads $numOutputs $heartbeatIntervalInMs $inputSocketType $inputRcvBufSize $inputMethod $inputAddress $outputSocketType1 $outputBufSize1 $outputMethod1 $outputAddress1 $outputSocketType2 $outputBufSize2 $outputMethod2 $outputAddress2 $outputSocketType3 $outputBufSize3 $outputMethod3 $outputAddress3 & +EPN2="testEPN_dynamic" +EPN2+=" --id EPN2" +EPN2+=" --num-outputs 3" +EPN2+=" --heartbeat-interval 5000" +EPN2+=" --input-socket-type sub --input-buff-size $buffSize --input-method bind --input-address tcp://127.0.0.1:5562" +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5580" +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5581" +EPN2+=" --output-socket-type pub --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN2 & diff --git a/devices/flp2epn/CMakeLists.txt b/devices/flp2epn/CMakeLists.txt index c944d49d7e235..63d7d5ebe69f9 100644 --- a/devices/flp2epn/CMakeLists.txt +++ b/devices/flp2epn/CMakeLists.txt @@ -27,7 +27,7 @@ set(SRCS set(DEPENDENCIES ${DEPENDENCIES} ${CMAKE_THREAD_LIBS_INIT} - boost_thread boost_timer boost_system FairMQ + boost_thread boost_timer boost_system boost_program_options FairMQ ) set(LIBRARY_NAME FLP2EPNex) diff --git a/devices/flp2epn/O2Proxy.cxx b/devices/flp2epn/O2Proxy.cxx index edee9f27f7a52..e015444c795a7 100644 --- a/devices/flp2epn/O2Proxy.cxx +++ b/devices/flp2epn/O2Proxy.cxx @@ -28,25 +28,25 @@ void O2Proxy::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); while ( fState == RUNNING ) { - // int i=0; - int64_t more=0; - size_t more_size = sizeof more; - do { - /* Create an empty ØMQ message to hold the message part */ - FairMQMessage* msgpart = fTransportFactory->CreateMessage(); - /* Block until a message is available to be received from socket */ - fPayloadInputs->at(0)->Receive(msgpart); - /* Determine if more message parts are to follow */ - fPayloadInputs->at(0)->GetOption("rcv-more", &more, &more_size); - // LOG(INFO) << "------ Get Msg Part "<< " more = " << more << " counter " << i++ ; - if(more){ - fPayloadOutputs->at(0)->Send(msgpart, "snd-more"); - }else{ - fPayloadOutputs->at(0)->Send(msgpart); - } - delete msgpart; - } while (more); - // i=0; + // int i = 0; + int64_t more = 0; + size_t more_size = sizeof more; + do { + /* Create an empty ØMQ message to hold the message part */ + FairMQMessage* msgpart = fTransportFactory->CreateMessage(); + /* Block until a message is available to be received from socket */ + fPayloadInputs->at(0)->Receive(msgpart); + /* Determine if more message parts are to follow */ + fPayloadInputs->at(0)->GetOption("rcv-more", &more, &more_size); + // LOG(INFO) << "------ Get Msg Part "<< " more = " << more << " counter " << i++ ; + if(more){ + fPayloadOutputs->at(0)->Send(msgpart, "snd-more"); + }else{ + fPayloadOutputs->at(0)->Send(msgpart); + } + delete msgpart; + } while (more); + // i = 0; } rateLogger.interrupt(); diff --git a/devices/flp2epn/run/runEPN.cxx b/devices/flp2epn/run/runEPN.cxx index 312e553ab8f82..6ef145960da16 100644 --- a/devices/flp2epn/run/runEPN.cxx +++ b/devices/flp2epn/run/runEPN.cxx @@ -8,107 +8,152 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2EPNex.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - +using namespace std; O2EPNex epn; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - epn.ChangeState(O2EPNex::STOP); - epn.ChangeState(O2EPNex::END); + epn.ChangeState(O2EPNex::STOP); + epn.ChangeState(O2EPNex::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if ( argc != 7 ) { - cout << "Usage: testEPN \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << endl; - return 1; - } + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); - s_catch_signals(); + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); - LOG(INFO) << "PID: " << getpid(); + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif + if ( vm.count("help") ) + { + LOG(INFO) << "EPN" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); - epn.SetTransport(transportFactory); + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); - int i = 1; + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); - epn.SetProperty(O2EPNex::Id, argv[i]); - ++i; + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - epn.SetProperty(O2EPNex::NumIoThreads, numIoThreads); - ++i; + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); - epn.SetProperty(O2EPNex::NumInputs, 1); - epn.SetProperty(O2EPNex::NumOutputs, 0); + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + epn.SetTransport(transportFactory); - epn.ChangeState(O2EPNex::INIT); + epn.SetProperty(O2EPNex::Id, options.id); + epn.SetProperty(O2EPNex::NumIoThreads, options.ioThreads); + epn.SetProperty(O2EPNex::NumInputs, 1); + epn.SetProperty(O2EPNex::NumOutputs, 0); - epn.SetProperty(O2EPNex::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - epn.SetProperty(O2EPNex::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - epn.SetProperty(O2EPNex::InputMethod, argv[i], 0); - ++i; - epn.SetProperty(O2EPNex::InputAddress, argv[i], 0); - ++i; + epn.ChangeState(O2EPNex::INIT); + epn.SetProperty(O2EPNex::InputSocketType, options.inputSocketType); + epn.SetProperty(O2EPNex::InputSndBufSize, options.inputBufSize); + epn.SetProperty(O2EPNex::InputMethod, options.inputMethod); + epn.SetProperty(O2EPNex::InputAddress, options.inputAddress); - epn.ChangeState(O2EPNex::SETOUTPUT); - epn.ChangeState(O2EPNex::SETINPUT); - epn.ChangeState(O2EPNex::RUN); + epn.ChangeState(O2EPNex::SETOUTPUT); + epn.ChangeState(O2EPNex::SETINPUT); + epn.ChangeState(O2EPNex::RUN); - // wait until the running thread has finished processing. - boost::unique_lock lock(epn.fRunningMutex); - while (!epn.fRunningFinished) - { + // wait until the running thread has finished processing. + boost::unique_lock lock(epn.fRunningMutex); + while (!epn.fRunningFinished) + { epn.fRunningCondition.wait(lock); - } + } - epn.ChangeState(O2EPNex::STOP); - epn.ChangeState(O2EPNex::END); + epn.ChangeState(O2EPNex::STOP); + epn.ChangeState(O2EPNex::END); - return 0; + return 0; } diff --git a/devices/flp2epn/run/runEPN_M.cxx b/devices/flp2epn/run/runEPN_M.cxx index 523a1fce922af..374e68c024485 100644 --- a/devices/flp2epn/run/runEPN_M.cxx +++ b/devices/flp2epn/run/runEPN_M.cxx @@ -8,108 +8,153 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2EpnMerger.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - +using namespace std; O2EpnMerger epn; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - epn.ChangeState(O2EpnMerger::STOP); - epn.ChangeState(O2EpnMerger::END); + epn.ChangeState(O2EpnMerger::STOP); + epn.ChangeState(O2EpnMerger::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if ( argc != 7 ) { - cout << "Usage: testEPN \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << endl; - return 1; - } + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); - s_catch_signals(); + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); - LOG(INFO) << "PID: " << getpid(); + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif + if ( vm.count("help") ) + { + LOG(INFO) << "EPN Merger" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); - epn.SetTransport(transportFactory); + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); - int i = 1; + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); - epn.SetProperty(O2EpnMerger::Id, argv[i]); - ++i; + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - epn.SetProperty(O2EpnMerger::NumIoThreads, numIoThreads); - ++i; + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); - epn.SetProperty(O2EpnMerger::NumInputs, 1); - epn.SetProperty(O2EpnMerger::NumOutputs, 0); + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + epn.SetTransport(transportFactory); - epn.ChangeState(O2EpnMerger::INIT); + epn.SetProperty(O2EpnMerger::Id, options.id); + epn.SetProperty(O2EpnMerger::NumIoThreads, options.ioThreads); + epn.SetProperty(O2EpnMerger::NumInputs, 1); + epn.SetProperty(O2EpnMerger::NumOutputs, 0); - epn.SetProperty(O2EpnMerger::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - epn.SetProperty(O2EpnMerger::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - epn.SetProperty(O2EpnMerger::InputMethod, argv[i], 0); - ++i; - epn.SetProperty(O2EpnMerger::InputAddress, argv[i], 0); - ++i; + epn.ChangeState(O2EpnMerger::INIT); + epn.SetProperty(O2EpnMerger::InputSocketType, options.inputSocketType); + epn.SetProperty(O2EpnMerger::InputSndBufSize, options.inputBufSize); + epn.SetProperty(O2EpnMerger::InputMethod, options.inputMethod); + epn.SetProperty(O2EpnMerger::InputAddress, options.inputAddress); - epn.ChangeState(O2EpnMerger::SETOUTPUT); - epn.ChangeState(O2EpnMerger::SETINPUT); - epn.ChangeState(O2EpnMerger::RUN); + epn.ChangeState(O2EpnMerger::SETOUTPUT); + epn.ChangeState(O2EpnMerger::SETINPUT); + epn.ChangeState(O2EpnMerger::RUN); - // wait until the running thread has finished processing. - boost::unique_lock lock(epn.fRunningMutex); - while (!epn.fRunningFinished) - { + // wait until the running thread has finished processing. + boost::unique_lock lock(epn.fRunningMutex); + while (!epn.fRunningFinished) + { epn.fRunningCondition.wait(lock); - } + } - epn.ChangeState(O2EpnMerger::STOP); - epn.ChangeState(O2EpnMerger::END); + epn.ChangeState(O2EpnMerger::STOP); + epn.ChangeState(O2EpnMerger::END); - return 0; + return 0; } diff --git a/devices/flp2epn/run/runFLP.cxx b/devices/flp2epn/run/runFLP.cxx index 4e5af653291c4..84e8185234636 100644 --- a/devices/flp2epn/run/runFLP.cxx +++ b/devices/flp2epn/run/runFLP.cxx @@ -8,108 +8,158 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2FLPex.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; +using namespace std; O2FLPex flp; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - flp.ChangeState(O2FLPex::STOP); - flp.ChangeState(O2FLPex::END); + flp.ChangeState(O2FLPex::STOP); + flp.ChangeState(O2FLPex::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions +{ + string id; + int eventSize; + int ioThreads; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) { - if ( argc != 8 ) { - cout << "Usage: testFLP ID eventSize numIoTreads\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << endl; - return 1; - } + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); - s_catch_signals(); + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("event-size", bpo::value()->default_value(1000), "Event size in bytes") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); - LOG(INFO) << "PID: " << getpid(); + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif + if ( vm.count("help") ) + { + LOG(INFO) << "FLP" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("event-size") ) + _options->eventSize = vm["event-size"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); - flp.SetTransport(transportFactory); + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); - int i = 1; + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); - flp.SetProperty(O2FLPex::Id, argv[i]); - ++i; + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif - int eventSize; - stringstream(argv[i]) >> eventSize; - flp.SetProperty(O2FLPex::EventSize, eventSize); - ++i; + flp.SetTransport(transportFactory); - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - flp.SetProperty(O2FLPex::NumIoThreads, numIoThreads); - ++i; + flp.SetProperty(O2FLPex::Id, options.id); + flp.SetProperty(O2FLPex::NumIoThreads, options.ioThreads); + flp.SetProperty(O2FLPex::EventSize, options.eventSize); - flp.SetProperty(O2FLPex::NumInputs, 0); - flp.SetProperty(O2FLPex::NumOutputs, 1); + flp.SetProperty(O2FLPex::NumInputs, 0); + flp.SetProperty(O2FLPex::NumOutputs, 1); - flp.ChangeState(O2FLPex::INIT); + flp.ChangeState(O2FLPex::INIT); - flp.SetProperty(O2FLPex::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - flp.SetProperty(O2FLPex::OutputSndBufSize, outputSndBufSize, 0); - ++i; - flp.SetProperty(O2FLPex::OutputMethod, argv[i], 0); - ++i; - flp.SetProperty(O2FLPex::OutputAddress, argv[i], 0); - ++i; + flp.SetProperty(O2FLPex::OutputSocketType, options.outputSocketType); + flp.SetProperty(O2FLPex::OutputRcvBufSize, options.outputBufSize); + flp.SetProperty(O2FLPex::OutputMethod, options.outputMethod); + flp.SetProperty(O2FLPex::OutputAddress, options.outputAddress); - flp.ChangeState(O2FLPex::SETOUTPUT); - flp.ChangeState(O2FLPex::SETINPUT); - flp.ChangeState(O2FLPex::RUN); + flp.ChangeState(O2FLPex::SETOUTPUT); + flp.ChangeState(O2FLPex::SETINPUT); + flp.ChangeState(O2FLPex::RUN); - // wait until the running thread has finished processing. - boost::unique_lock lock(flp.fRunningMutex); - while (!flp.fRunningFinished) - { + // wait until the running thread has finished processing. + boost::unique_lock lock(flp.fRunningMutex); + while (!flp.fRunningFinished) + { flp.fRunningCondition.wait(lock); - } + } - flp.ChangeState(O2FLPex::STOP); - flp.ChangeState(O2FLPex::END); + flp.ChangeState(O2FLPex::STOP); + flp.ChangeState(O2FLPex::END); - return 0; + return 0; } diff --git a/devices/flp2epn/run/runMerger.cxx b/devices/flp2epn/run/runMerger.cxx index 8bbf7a0af3a6f..9b1ccbdd36bac 100644 --- a/devices/flp2epn/run/runMerger.cxx +++ b/devices/flp2epn/run/runMerger.cxx @@ -8,125 +8,186 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2Merger.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - +using namespace std; O2Merger merger; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - merger.ChangeState(O2Merger::STOP); - merger.ChangeState(O2Merger::END); + merger.ChangeState(O2Merger::STOP); + merger.ChangeState(O2Merger::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions +{ + string id; + int ioThreads; + int numInputs; + vector inputSocketType; + vector inputBufSize; + vector inputMethod; + vector inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) { - if ( argc < 16 || (argc - 8) % 4 != 0 ) { - cout << "Usage: merger \tID numIoTreads numInputs\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\t...\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" - << argc << " arguments provided" << endl; - return 1; - } + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); + + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("num-inputs", bpo::value()->required(), "Number of input sockets") + ("input-socket-type", bpo::value< vector >()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value< vector >()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value< vector >()->required(), "Input method: bind/connect") + ("input-address", bpo::value< vector >()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); + + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); + + if ( vm.count("help") ) + { + LOG(INFO) << "MERGER" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("num-inputs") ) + _options->numInputs = vm["num-inputs"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as< vector >(); + + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as< vector >(); + + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as< vector >(); + + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as< vector >(); - s_catch_signals(); + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); - LOG(INFO) << "PID: " << getpid(); + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); #else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); #endif - merger.SetTransport(transportFactory); - - int i = 1; - - merger.SetProperty(O2Merger::Id, argv[i]); - ++i; - - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - merger.SetProperty(O2Merger::NumIoThreads, numIoThreads); - ++i; - - int numInputs; - stringstream(argv[i]) >> numInputs; - merger.SetProperty(O2Merger::NumInputs, numInputs); - ++i; - - merger.SetProperty(O2Merger::NumOutputs, 1); - - merger.ChangeState(O2Merger::INIT); - - for (int iInput = 0; iInput < numInputs; iInput++ ) { - merger.SetProperty(O2Merger::InputSocketType, argv[i], iInput); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - merger.SetProperty(O2Merger::InputRcvBufSize, inputRcvBufSize, iInput); - ++i; - merger.SetProperty(O2Merger::InputMethod, argv[i], iInput); - ++i; - merger.SetProperty(O2Merger::InputAddress, argv[i], iInput); - ++i; - } - - merger.SetProperty(O2Merger::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - merger.SetProperty(O2Merger::OutputSndBufSize, outputSndBufSize, 0); - ++i; - merger.SetProperty(O2Merger::OutputMethod, argv[i], 0); - ++i; - merger.SetProperty(O2Merger::OutputAddress, argv[i], 0); - ++i; - - merger.ChangeState(O2Merger::SETOUTPUT); - merger.ChangeState(O2Merger::SETINPUT); - merger.ChangeState(O2Merger::RUN); - - // wait until the running thread has finished processing. - boost::unique_lock lock(merger.fRunningMutex); - while (!merger.fRunningFinished) - { + merger.SetTransport(transportFactory); + + merger.SetProperty(O2Merger::Id, options.id); + merger.SetProperty(O2Merger::NumIoThreads, options.ioThreads); + + merger.SetProperty(O2Merger::NumInputs, options.numInputs); + merger.SetProperty(O2Merger::NumOutputs, 1); + + merger.ChangeState(O2Merger::INIT); + + for (int i = 0; i < options.numInputs; ++i) + { + merger.SetProperty(O2Merger::InputSocketType, options.inputSocketType.at(i), i); + merger.SetProperty(O2Merger::InputSndBufSize, options.inputBufSize.at(i), i); + merger.SetProperty(O2Merger::InputMethod, options.inputMethod.at(i), i); + merger.SetProperty(O2Merger::InputAddress, options.inputAddress.at(i), i); + } + + merger.SetProperty(O2Merger::OutputSocketType, options.outputSocketType); + merger.SetProperty(O2Merger::OutputRcvBufSize, options.outputBufSize); + merger.SetProperty(O2Merger::OutputMethod, options.outputMethod); + merger.SetProperty(O2Merger::OutputAddress, options.outputAddress); + + merger.ChangeState(O2Merger::SETOUTPUT); + merger.ChangeState(O2Merger::SETINPUT); + merger.ChangeState(O2Merger::RUN); + + // wait until the running thread has finished processing. + boost::unique_lock lock(merger.fRunningMutex); + while (!merger.fRunningFinished) + { merger.fRunningCondition.wait(lock); - } + } - merger.ChangeState(O2Merger::STOP); - merger.ChangeState(O2Merger::END); + merger.ChangeState(O2Merger::STOP); + merger.ChangeState(O2Merger::END); - return 0; + return 0; } diff --git a/devices/flp2epn/run/runProxy.cxx b/devices/flp2epn/run/runProxy.cxx index 3e32eb579c084..c89573eb88759 100644 --- a/devices/flp2epn/run/runProxy.cxx +++ b/devices/flp2epn/run/runProxy.cxx @@ -8,120 +8,179 @@ #include #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" #include "O2Proxy.h" #ifdef NANOMSG - #include "FairMQTransportFactoryNN.h" + #include "FairMQTransportFactoryNN.h" #else - #include "FairMQTransportFactoryZMQ.h" + #include "FairMQTransportFactoryZMQ.h" #endif -using std::cout; -using std::cin; -using std::endl; -using std::stringstream; - +using namespace std; O2Proxy proxy; static void s_signal_handler (int signal) { - cout << endl << "Caught signal " << signal << endl; + cout << endl << "Caught signal " << signal << endl; - proxy.ChangeState(O2Proxy::STOP); - proxy.ChangeState(O2Proxy::END); + proxy.ChangeState(O2Proxy::STOP); + proxy.ChangeState(O2Proxy::END); - cout << "Shutdown complete. Bye!" << endl; - exit(1); + cout << "Shutdown complete. Bye!" << endl; + exit(1); } static void s_catch_signals (void) { - struct sigaction action; - action.sa_handler = s_signal_handler; - action.sa_flags = 0; - sigemptyset(&action.sa_mask); - sigaction(SIGINT, &action, NULL); - sigaction(SIGTERM, &action, NULL); + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); } -int main(int argc, char** argv) +typedef struct DeviceOptions { - if ( argc != 11 ) { - cout << "Usage: testProxy \tID numIoTreads\n" - << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" - << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; - return 1; - } + string id; + int ioThreads; + string inputSocketType; + int inputBufSize; + string inputMethod; + string inputAddress; + string outputSocketType; + int outputBufSize; + string outputMethod; + string outputAddress; +} DeviceOptions_t; + +inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) +{ + if (_options == NULL) + throw std::runtime_error("Internal error: options' container is empty."); - s_catch_signals(); + namespace bpo = boost::program_options; + bpo::options_description desc("Options"); + desc.add_options() + ("id", bpo::value()->required(), "Device ID") + ("io-threads", bpo::value()->default_value(1), "Number of I/O threads") + ("input-socket-type", bpo::value()->required(), "Input socket type: sub/pull") + ("input-buff-size", bpo::value()->required(), "Input buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("input-method", bpo::value()->required(), "Input method: bind/connect") + ("input-address", bpo::value()->required(), "Input address, e.g.: \"tcp://localhost:5555\"") + ("output-socket-type", bpo::value()->required(), "Output socket type: pub/push") + ("output-buff-size", bpo::value()->required(), "Output buffer size in number of messages (ZeroMQ)/bytes(nanomsg)") + ("output-method", bpo::value()->required(), "Output method: bind/connect") + ("output-address", bpo::value()->required(), "Output address, e.g.: \"tcp://localhost:5555\"") + ("help", "Print help messages"); - LOG(INFO) << "PID: " << getpid(); + bpo::variables_map vm; + bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif + if ( vm.count("help") ) + { + LOG(INFO) << "Proxy" << endl << desc; + return false; + } + + bpo::notify(vm); + + if ( vm.count("id") ) + _options->id = vm["id"].as(); + + if ( vm.count("io-threads") ) + _options->ioThreads = vm["io-threads"].as(); + + if ( vm.count("input-socket-type") ) + _options->inputSocketType = vm["input-socket-type"].as(); - proxy.SetTransport(transportFactory); + if ( vm.count("input-buff-size") ) + _options->inputBufSize = vm["input-buff-size"].as(); - int i = 1; + if ( vm.count("input-method") ) + _options->inputMethod = vm["input-method"].as(); - proxy.SetProperty(O2Proxy::Id, argv[i]); - ++i; + if ( vm.count("input-address") ) + _options->inputAddress = vm["input-address"].as(); - int numIoThreads; - stringstream(argv[i]) >> numIoThreads; - proxy.SetProperty(O2Proxy::NumIoThreads, numIoThreads); - ++i; + if ( vm.count("output-socket-type") ) + _options->outputSocketType = vm["output-socket-type"].as(); - proxy.SetProperty(O2Proxy::NumInputs, 1); - proxy.SetProperty(O2Proxy::NumOutputs, 1); + if ( vm.count("output-buff-size") ) + _options->outputBufSize = vm["output-buff-size"].as(); + + if ( vm.count("output-method") ) + _options->outputMethod = vm["output-method"].as(); + + if ( vm.count("output-address") ) + _options->outputAddress = vm["output-address"].as(); + + return true; +} + +int main(int argc, char** argv) +{ + s_catch_signals(); + + DeviceOptions_t options; + try + { + if (!parse_cmd_line(argc, argv, &options)) + return 0; + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + return 1; + } + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + proxy.SetTransport(transportFactory); - proxy.ChangeState(O2Proxy::INIT); + proxy.SetProperty(O2Proxy::Id, options.id); + proxy.SetProperty(O2Proxy::NumIoThreads, options.ioThreads); + proxy.SetProperty(O2Proxy::NumInputs, 1); + proxy.SetProperty(O2Proxy::NumOutputs, 1); - proxy.SetProperty(O2Proxy::InputSocketType, argv[i], 0); - ++i; - int inputRcvBufSize; - stringstream(argv[i]) >> inputRcvBufSize; - proxy.SetProperty(O2Proxy::InputRcvBufSize, inputRcvBufSize, 0); - ++i; - proxy.SetProperty(O2Proxy::InputMethod, argv[i], 0); - ++i; - proxy.SetProperty(O2Proxy::InputAddress, argv[i], 0); - ++i; + proxy.ChangeState(O2Proxy::INIT); - proxy.SetProperty(O2Proxy::OutputSocketType, argv[i], 0); - ++i; - int outputSndBufSize; - stringstream(argv[i]) >> outputSndBufSize; - proxy.SetProperty(O2Proxy::OutputSndBufSize, outputSndBufSize, 0); - ++i; - proxy.SetProperty(O2Proxy::OutputMethod, argv[i], 0); - ++i; - proxy.SetProperty(O2Proxy::OutputAddress, argv[i], 0); - ++i; + proxy.SetProperty(O2Proxy::InputSocketType, options.inputSocketType); + proxy.SetProperty(O2Proxy::InputSndBufSize, options.inputBufSize); + proxy.SetProperty(O2Proxy::InputMethod, options.inputMethod); + proxy.SetProperty(O2Proxy::InputAddress, options.inputAddress); + proxy.SetProperty(O2Proxy::OutputSocketType, options.outputSocketType); + proxy.SetProperty(O2Proxy::OutputRcvBufSize, options.outputBufSize); + proxy.SetProperty(O2Proxy::OutputMethod, options.outputMethod); + proxy.SetProperty(O2Proxy::OutputAddress, options.outputAddress); - proxy.ChangeState(O2Proxy::SETOUTPUT); - proxy.ChangeState(O2Proxy::SETINPUT); - proxy.ChangeState(O2Proxy::RUN); + proxy.ChangeState(O2Proxy::SETOUTPUT); + proxy.ChangeState(O2Proxy::SETINPUT); + proxy.ChangeState(O2Proxy::RUN); - // wait until the running thread has finished processing. - boost::unique_lock lock(proxy.fRunningMutex); - while (!proxy.fRunningFinished) - { + // wait until the running thread has finished processing. + boost::unique_lock lock(proxy.fRunningMutex); + while (!proxy.fRunningFinished) + { proxy.fRunningCondition.wait(lock); - } + } - proxy.ChangeState(O2Proxy::STOP); - proxy.ChangeState(O2Proxy::END); + proxy.ChangeState(O2Proxy::STOP); + proxy.ChangeState(O2Proxy::END); - return 0; + return 0; } diff --git a/devices/flp2epn/run/startFLP2EPN.sh.in b/devices/flp2epn/run/startFLP2EPN.sh.in index e4c5cfbad1721..f36ada84d6516 100755 --- a/devices/flp2epn/run/startFLP2EPN.sh.in +++ b/devices/flp2epn/run/startFLP2EPN.sh.in @@ -1,36 +1,21 @@ #!/bin/bash -if(@NANOMSG_FOUND@); then - buffSize="50000000" # nanomsg buffer size is in bytes -else - buffSize="1000" # zeromq high-water mark is in messages -fi +buffSize="1000" # zeromq high-water mark is in messages +#buffSize="50000000" # nanomsg buffer size is in bytes -ID="101" -eventSize="10000" -numIoThreads="1" -outputSocketType="push" -outputBufSize=$buffSize -outputMethod="connect" -outputAddress="tcp://localhost:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP $ID $eventSize $numIoThreads $outputSocketType $outputBufSize $outputMethod $outputAddress & +FLP0="testFLP" +FLP0+=" --id 101" +FLP0+=" --event-size 10000" +FLP0+=" --output-socket-type push --output-buff-size $buffSize --output-method connect --output-address tcp://127.0.0.1:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP0 & -ID="201" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="bind" -inputAddress="tcp://*:5565" -outputSocketType="push" -outputSndBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5566" -xterm -e @CMAKE_BINARY_DIR@/bin/testProxy $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress $outputSocketType $outputSndBufSize $outputMethod $outputAddress & +PROXY="testProxy" +PROXY+=" --id 201" +PROXY+=" --input-socket-type pull --input-buff-size $buffSize --input-method bind --input-address tcp://*:5565" +PROXY+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5566" +xterm -e @CMAKE_BINARY_DIR@/bin/$PROXY & -ID="301" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5566" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress & +EPN0="testEPN" +EPN0+=" --id 301" +EPN0+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://127.0.0.1:5566" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN0 & diff --git a/devices/flp2epn/run/startMerger.sh.in b/devices/flp2epn/run/startMerger.sh.in index b300ac49056ca..57456400f4d60 100755 --- a/devices/flp2epn/run/startMerger.sh.in +++ b/devices/flp2epn/run/startMerger.sh.in @@ -1,103 +1,57 @@ #!/bin/bash -if(false); then - buffSize="50000000" # nanomsg buffer size is in bytes -else - buffSize="1000" # zeromq high-water mark is in messages -fi - -ID="101" -eventSize="1000" -numIoThreads="1" -outputSocketType="push" -outputBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5565" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP $ID $eventSize $numIoThreads $outputSocketType $outputBufSize $outputMethod $outputAddress & - -ID="102" -eventSize="10000" -numIoThreads="1" -outputSocketType="push" -outputBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5566" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP $ID $eventSize $numIoThreads $outputSocketType $outputBufSize $outputMethod $outputAddress & - -ID="103" -eventSize="10000" -numIoThreads="1" -outputSocketType="push" -outputBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5567" -xterm -e @CMAKE_BINARY_DIR@/bin/testFLP $ID $eventSize $numIoThreads $outputSocketType $outputBufSize $outputMethod $outputAddress & - -ID="501" -numIoThreads="1" -numInputs="3" - -inputSocketType1="pull" -inputRcvBufSize1=$buffSize -inputMethod1="connect" -inputAddress1="tcp://localhost:5565" - -inputSocketType2="pull" -inputRcvBufSize2=$buffSize -inputMethod2="connect" -inputAddress2="tcp://localhost:5566" - -inputSocketType3="pull" -inputRcvBufSize3=$buffSize -inputMethod3="connect" -inputAddress3="tcp://localhost:5567" - -outputSocketType="push" -outputSndBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5581" -xterm -e @CMAKE_BINARY_DIR@/bin/testMerger $ID $numIoThreads $numInputs $inputSocketType1 $inputRcvBufSize1 $inputMethod1 $inputAddress1 $inputSocketType2 $inputRcvBufSize2 $inputMethod2 $inputAddress2 $inputSocketType3 $inputRcvBufSize3 $inputMethod3 $inputAddress3 $outputSocketType $outputSndBufSize $outputMethod $outputAddress & - -ID="201" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5581" -outputSocketType="push" -outputSndBufSize=$buffSize -outputMethod="bind" -outputAddress="tcp://*:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testProxy $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress $outputSocketType $outputSndBufSize $outputMethod $outputAddress & - -ID="301" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress & - -ID="302" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress & - -ID="303" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress & - -ID="304" -numIoThreads="1" -inputSocketType="pull" -inputRcvBufSize=$buffSize -inputMethod="connect" -inputAddress="tcp://localhost:5582" -xterm -e @CMAKE_BINARY_DIR@/bin/testEPN $ID $numIoThreads $inputSocketType $inputRcvBufSize $inputMethod $inputAddress & +buffSize="1000" # zeromq high-water mark is in messages +#buffSize="50000000" # nanomsg buffer size is in bytes + +FLP0="testFLP" +FLP0+=" --id 101" +FLP0+=" --event-size 10000" +FLP0+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP0 & + +FLP1="testFLP" +FLP1+=" --id 102" +FLP1+=" --event-size 10000" +FLP1+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5566" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP1 & + +FLP2="testFLP" +FLP2+=" --id 103" +FLP2+=" --event-size 10000" +FLP2+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5567" +xterm -e @CMAKE_BINARY_DIR@/bin/$FLP2 & + +MERGER="testMerger" +MERGER+=" --id 501" +MERGER+=" --num-inputs 3" +MERGER+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565" +MERGER+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5566" +MERGER+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5567" +MERGER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5581" +xterm -e @CMAKE_BINARY_DIR@/bin/$MERGER & + +PROXY="testProxy" +PROXY+=" --id 201" +PROXY+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5581" +PROXY+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$PROXY & + +EPN0="testEPN" +EPN0+=" --id 301" +EPN0+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN0 & + +EPN1="testEPN" +EPN1+=" --id 302" +EPN1+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN1 & + +EPN2="testEPN" +EPN2+=" --id 303" +EPN2+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN2 & + +EPN3="testEPN" +EPN3+=" --id 304" +EPN3+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5582" +xterm -e @CMAKE_BINARY_DIR@/bin/$EPN3 &