Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@
*.exe
*.out
*.app

# build directory
build/
1 change: 1 addition & 0 deletions devices/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory (flp2epn)
add_subdirectory (flp2epn-dynamic)
add_subdirectory (flp2epn-distributed)
add_subdirectory (alicehlt)
59 changes: 59 additions & 0 deletions devices/flp2epn-distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
set(INCLUDE_DIRECTORIES
${BASE_INCLUDE_DIRECTORIES}
${Boost_INCLUDE_DIR}
${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed
)

include_directories(${INCLUDE_DIRECTORIES})

configure_file(
${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh
)

set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
${FAIRROOT_LIBRARY_DIR}
)

link_directories(${LINK_DIRECTORIES})

set(SRCS
FLPex.cxx
EPNex.cxx
FLPexSampler.cxx
)

set(DEPENDENCIES
${DEPENDENCIES}
${CMAKE_THREAD_LIBS_INIT}
boost_date_time boost_thread boost_timer boost_system boost_program_options FairMQ
)

set(LIBRARY_NAME FLP2EPNex_distributed)

GENERATE_LIBRARY()

Set(Exe_Names
${Exe_Names}
testFLP_distributed
testEPN_distributed
testFLPSampler
)

set(Exe_Source
run/runFLP_distributed.cxx
run/runEPN_distributed.cxx
run/runFLPSampler.cxx
)

list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)

ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
set(EXE_NAME ${_name})
set(SRCS ${_src})
set(DEPENDENCIES FLP2EPNex_distributed)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})
130 changes: 130 additions & 0 deletions devices/flp2epn-distributed/EPNex.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* EPNex.cxx
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos
*/

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

#include "EPNex.h"
#include "FairMQLogger.h"

using namespace std;

using namespace AliceO2::Devices;

EPNex::EPNex() :
fHeartbeatIntervalInMs(5000)
{
}

EPNex::~EPNex()
{
}

void EPNex::Run()
{
LOG(INFO) << ">>>>>>> Run <<<<<<<";

// boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
boost::thread heartbeatSender(boost::bind(&EPNex::sendHeartbeats, this));

size_t idPartSize = 0;
size_t dataPartSize = 0;

while (fState == RUNNING) {
// Receive payload
FairMQMessage* idPart = fTransportFactory->CreateMessage();

idPartSize = fPayloadInputs->at(0)->Receive(idPart);

if (idPartSize > 0) {
unsigned long* id = reinterpret_cast<unsigned long*>(idPart->GetData());
LOG(INFO) << "Received Event #" << *id;

FairMQMessage* dataPart = fTransportFactory->CreateMessage();
dataPartSize = fPayloadInputs->at(0)->Receive(dataPart);

if (dataPartSize > 0) {
// ... do something with data here ...
}
delete dataPart;
}
delete idPart;
}

// rateLogger.interrupt();
// rateLogger.join();

heartbeatSender.interrupt();
heartbeatSender.join();

FairMQDevice::Shutdown();

// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
}

void EPNex::sendHeartbeats()
{
while (true) {
try {
for (int i = 0; i < fNumOutputs; ++i) {
FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(fInputAddress.at(0).size());
memcpy(heartbeatMsg->GetData(), fInputAddress.at(0).c_str(), fInputAddress.at(0).size());

fPayloadOutputs->at(i)->Send(heartbeatMsg);

delete heartbeatMsg;
}
boost::this_thread::sleep(boost::posix_time::milliseconds(fHeartbeatIntervalInMs));
} catch (boost::thread_interrupted&) {
LOG(INFO) << "EPNex::sendHeartbeat() interrupted";
break;
}
} // while (true)
}

void EPNex::SetProperty(const int key, const string& value, const int slot/*= 0*/)
{
switch (key) {
default:
FairMQDevice::SetProperty(key, value, slot);
break;
}
}

string EPNex::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/)
{
switch (key) {
default:
return FairMQDevice::GetProperty(key, default_, slot);
}
}

void EPNex::SetProperty(const int key, const int value, const int slot/*= 0*/)
{
switch (key) {
case HeartbeatIntervalInMs:
fHeartbeatIntervalInMs = value;
break;
default:
FairMQDevice::SetProperty(key, value, slot);
break;
}
}

int EPNex::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/)
{
switch (key) {
case HeartbeatIntervalInMs:
return fHeartbeatIntervalInMs;
default:
return FairMQDevice::GetProperty(key, default_, slot);
}
}
43 changes: 43 additions & 0 deletions devices/flp2epn-distributed/EPNex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* EPNex.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko, M.Al-Turany, C. Kouzinopoulos
*/

#ifndef ALICEO2_DEVICES_EPNEX_H_
#define ALICEO2_DEVICES_EPNEX_H_

#include <string>

#include "FairMQDevice.h"

namespace AliceO2 {
namespace Devices {

class EPNex : public FairMQDevice
{
public:
enum {
HeartbeatIntervalInMs = FairMQDevice::Last,
Last
};
EPNex();
virtual ~EPNex();

virtual void SetProperty(const int key, const std::string& value, const int slot = 0);
virtual std::string GetProperty(const int key, const std::string& default_ = "", const int slot = 0);
virtual void SetProperty(const int key, const int value, const int slot = 0);
virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0);

protected:
virtual void Run();
void sendHeartbeats();

int fHeartbeatIntervalInMs;
};

} // namespace Devices
} // namespace AliceO2

#endif
Loading