Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions devices/flp2epn-distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
configure_file(${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh)
configure_file(${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh)

set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed
)
Expand All @@ -22,10 +25,6 @@ endif()
include_directories(${INCLUDE_DIRECTORIES})
include_directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})

configure_file(
${CMAKE_SOURCE_DIR}/devices/flp2epn-distributed/run/startFLP2EPN-distributed.sh.in ${CMAKE_BINARY_DIR}/bin/startFLP2EPN-distributed.sh
)

set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
${FAIRROOT_LIBRARY_DIR}
Expand Down Expand Up @@ -115,3 +114,7 @@ ForEach(_file RANGE 0 ${_length})
set(DEPENDENCIES FLP2EPNex_distributed)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

add_test(NAME run_flp2epn_distributed COMMAND ${CMAKE_BINARY_DIR}/devices/flp2epn-distributed/test/testFLP2EPN-distributed.sh)
set_tests_properties(run_flp2epn_distributed PROPERTIES TIMEOUT "30")
set_tests_properties(run_flp2epn_distributed PROPERTIES PASS_REGULAR_EXPRESSION "acknowledged after")
70 changes: 29 additions & 41 deletions devices/flp2epn-distributed/EPNReceiver.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void EPNReceiver::DiscardIncompleteTimeframes()
LOG(WARN) << "Timeframe #" << it->first << " incomplete after " << fBufferTimeoutInMs << " milliseconds, discarding";
fDiscardedSet.insert(it->first);
for (int i = 0; i < (it->second).parts.size(); ++i) {
delete (it->second).parts.at(i);
(it->second).parts.at(i).reset();
}
it->second.parts.clear();
fTimeframeBuffer.erase(it++);
Expand All @@ -82,10 +82,7 @@ void EPNReceiver::Run()
{
// boost::thread heartbeatSender(boost::bind(&EPNReceiver::sendHeartbeats, this));

FairMQPoller* poller = fTransportFactory->CreatePoller(fChannels.at("data-in"));

int SNDMORE = fChannels.at("data-in").at(0).fSocket->SNDMORE;
int NOBLOCK = fChannels.at("data-in").at(0).fSocket->NOBLOCK;
unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels.at("data-in")));

// DEBUG: store receive intervals per FLP
// vector<vector<int>> rcvIntervals(fNumFLPs, vector<int>());
Expand All @@ -104,7 +101,7 @@ void EPNReceiver::Run()
poller->Poll(100);

if (poller->CheckInput(0)) {
FairMQMessage* headerPart = fTransportFactory->CreateMessage();
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage());

if (dataInputChannel.Receive(headerPart) > 0) {
// store the received ID
Expand All @@ -123,61 +120,55 @@ void EPNReceiver::Run()
// }
// end DEBUG

FairMQMessage* dataPart = fTransportFactory->CreateMessage();
rcvDataSize = dataInputChannel.Receive(dataPart);
unique_ptr<FairMQMessage> dataPart(fTransportFactory->CreateMessage());

if (fDiscardedSet.find(id) == fDiscardedSet.end()) {
// if received ID has not previously been discarded.
if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end()) {
// if received ID is not yet in the buffer.
if (rcvDataSize > 0) {
// receive data, store it in the buffer, save the receive time.
fTimeframeBuffer[id].parts.push_back(dataPart);
// receive the data part
if (dataInputChannel.Receive(dataPart) > 0)
{
if (fDiscardedSet.find(id) == fDiscardedSet.end())
{
if (fTimeframeBuffer.find(id) == fTimeframeBuffer.end())
{
// if this is the first part with this ID, save the receive time.
fTimeframeBuffer[id].startTime = boost::posix_time::microsec_clock::local_time();
} else {
LOG(ERROR) << "no data received from input socket";
delete dataPart;
}
// PrintBuffer(fTimeframeBuffer);
} else {
// if received ID is already in the buffer
if (rcvDataSize > 0) {
fTimeframeBuffer[id].parts.push_back(dataPart);
} else {
LOG(ERROR) << "no data received from input socket";
delete dataPart;
}
// if the received ID has not previously been discarded,
// store the data part in the buffer
fTimeframeBuffer[id].parts.push_back(move(dataPart));
// PrintBuffer(fTimeframeBuffer);
}
} else {
// if received ID has been previously discarded.
LOG(WARN) << "Received part from an already discarded timeframe with id " << id;
delete dataPart;
else
{
// if received ID has been previously discarded.
LOG(WARN) << "Received part from an already discarded timeframe with id " << id;
}
}
else
{
LOG(ERROR) << "no data received from input socket";
}

if (fTimeframeBuffer[id].parts.size() == fNumFLPs) {
// LOG(INFO) << "Collected all parts for timeframe #" << id;
// when all parts are collected send all except last one with 'snd-more' flag, and last one without the flag.
for (int i = 0; i < fNumFLPs - 1; ++i) {
dataOutChannel.Send(fTimeframeBuffer[id].parts.at(i), SNDMORE);
dataOutChannel.SendPart(fTimeframeBuffer[id].parts.at(i));
}
dataOutChannel.Send(fTimeframeBuffer[id].parts.at(fNumFLPs - 1));

if (fTestMode > 0) {
// Send an acknowledgement back to the sampler to measure the round trip time
FairMQMessage* ack = fTransportFactory->CreateMessage(sizeof(uint16_t));
unique_ptr<FairMQMessage> ack(fTransportFactory->CreateMessage(sizeof(uint16_t)));
memcpy(ack->GetData(), &id, sizeof(uint16_t));

if (ackOutChannel.Send(ack, NOBLOCK) <= 0) {
if (ackOutChannel.SendAsync(ack) <= 0) {
LOG(ERROR) << "Could not send acknowledgement without blocking";
}

delete ack;
}

// let transport know that the data is no longer needed. transport will clean up after it is sent out.
for (int i = 0; i < fTimeframeBuffer[id].parts.size(); ++i) {
delete fTimeframeBuffer[id].parts.at(i);
fTimeframeBuffer[id].parts.at(i).reset();
}
fTimeframeBuffer[id].parts.clear();

Expand All @@ -188,7 +179,6 @@ void EPNReceiver::Run()

// LOG(WARN) << "Buffer size: " << fTimeframeBuffer.size();
}
delete headerPart;
}

// check if any incomplete timeframes in the buffer are older than timeout period, and discard them if they are
Expand Down Expand Up @@ -220,12 +210,10 @@ void EPNReceiver::sendHeartbeats()
while (CheckCurrentState(RUNNING)) {
try {
for (int i = 0; i < fNumFLPs; ++i) {
FairMQMessage* heartbeatMsg = fTransportFactory->CreateMessage(ownAddressSize);
unique_ptr<FairMQMessage> heartbeatMsg(fTransportFactory->CreateMessage(ownAddressSize));
memcpy(heartbeatMsg->GetData(), ownAddress.c_str(), ownAddressSize);

fChannels.at("heartbeat-out").at(i).Send(heartbeatMsg);

delete heartbeatMsg;
}
boost::this_thread::sleep(boost::posix_time::milliseconds(fHeartbeatIntervalInMs));
} catch (boost::thread_interrupted&) {
Expand Down
2 changes: 1 addition & 1 deletion devices/flp2epn-distributed/EPNReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Devices {

struct TFBuffer
{
std::vector<FairMQMessage*> parts;
std::vector<std::unique_ptr<FairMQMessage>> parts;
boost::posix_time::ptime startTime;
boost::posix_time::ptime endTime;
};
Expand Down
44 changes: 18 additions & 26 deletions devices/flp2epn-distributed/FLPSender.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ FLPSender::FLPSender()
, fHeaderBuffer()
, fDataBuffer()
, fArrivalTime()
, fSndMoreFlag(0)
, fNoBlockFlag(0)
, fNumEPNs(0)
, fHeartbeatTimeoutInMs(20000)
, fHeartbeats()
Expand Down Expand Up @@ -66,7 +64,7 @@ void FLPSender::receiveHeartbeats()

while (CheckCurrentState(RUNNING)) {
try {
FairMQMessage* hbMsg = fTransportFactory->CreateMessage();
unique_ptr<FairMQMessage> hbMsg(fTransportFactory->CreateMessage());

if (hbChannel.Receive(hbMsg) > 0) {
string address = string(static_cast<char*>(hbMsg->GetData()), hbMsg->GetSize());
Expand All @@ -87,8 +85,6 @@ void FLPSender::receiveHeartbeats()
LOG(ERROR) << "IP " << address << " unknown, not provided at execution time";
}
}

delete hbMsg;
} catch (boost::thread_interrupted&) {
LOG(INFO) << "FLPSender::receiveHeartbeats() interrupted";
break;
Expand All @@ -100,15 +96,13 @@ void FLPSender::Run()
{
// boost::thread heartbeatReceiver(boost::bind(&FLPSender::receiveHeartbeats, this));

// base buffer, to be copied from for every timeframe body
// base buffer, to be copied from for every timeframe body (zero-copy)
void* buffer = operator new[](fEventSize);
FairMQMessage* baseMsg = fTransportFactory->CreateMessage(buffer, fEventSize);

fSndMoreFlag = fChannels.at("data-in").at(0).fSocket->SNDMORE;
fNoBlockFlag = fChannels.at("data-in").at(0).fSocket->NOBLOCK;
unique_ptr<FairMQMessage> baseMsg(fTransportFactory->CreateMessage(buffer, fEventSize));

uint16_t timeFrameId = 0;

// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at("data-in").at(0);

while (CheckCurrentState(RUNNING)) {
Expand All @@ -117,7 +111,7 @@ void FLPSender::Run()

if (fTestMode > 0) {
// test-mode: receive and store id part in the buffer.
FairMQMessage* idPart = fTransportFactory->CreateMessage();
unique_ptr<FairMQMessage> idPart(fTransportFactory->CreateMessage());
if (dataInChannel.Receive(idPart) > 0) {
h->timeFrameId = *(static_cast<uint16_t*>(idPart->GetData()));
h->flpIndex = fIndex;
Expand All @@ -126,8 +120,6 @@ void FLPSender::Run()
delete h;
continue;
}

delete idPart;
} else {
// regular mode: use the id generated locally
h->timeFrameId = timeFrameId;
Expand All @@ -138,22 +130,22 @@ void FLPSender::Run()
}
}

FairMQMessage* headerPart = fTransportFactory->CreateMessage(h, sizeof(f2eHeader));
FairMQMessage* dataPart = fTransportFactory->CreateMessage();
unique_ptr<FairMQMessage> headerPart(fTransportFactory->CreateMessage(h, sizeof(f2eHeader)));
unique_ptr<FairMQMessage> dataPart(fTransportFactory->CreateMessage());

// save the arrival time of the message.
fArrivalTime.push(boost::posix_time::microsec_clock::local_time());

if (fTestMode > 0) {
// test-mode: initialize and store data part in the buffer.
dataPart->Copy(baseMsg);
fHeaderBuffer.push(headerPart);
fDataBuffer.push(dataPart);
fHeaderBuffer.push(move(headerPart));
fDataBuffer.push(move(dataPart));
} else {
// regular mode: receive data part from input
if (dataInChannel.Receive(dataPart) >= 0) {
fHeaderBuffer.push(headerPart);
fDataBuffer.push(dataPart);
fHeaderBuffer.push(move(headerPart));
fDataBuffer.push(move(dataPart));
} else {
// if nothing was received, try again
continue;
Expand All @@ -176,8 +168,6 @@ void FLPSender::Run()
}
}

delete baseMsg;

// heartbeatReceiver.interrupt();
// heartbeatReceiver.join();
}
Expand Down Expand Up @@ -207,11 +197,13 @@ inline void FLPSender::sendFrontData()
// fArrivalTime.pop();
// fDataBuffer.pop();
// } else { // if the heartbeat from the corresponding EPN is within timeout period, send the data.
if (fChannels.at("data-out").at(direction).Send(fHeaderBuffer.front(), fSndMoreFlag|fNoBlockFlag) <= 0) {
LOG(ERROR) << "Could not queue ID part of event #" << currentTimeframeId << " without blocking";
}
if (fChannels.at("data-out").at(direction).Send(fDataBuffer.front(), fNoBlockFlag) <= 0) {
LOG(ERROR) << "Could not send message with event #" << currentTimeframeId << " without blocking";
if (fChannels.at("data-out").at(direction).SendPart(fHeaderBuffer.front()) < 0) {
// TODO: replace SendPart() with SendPartAsync() after nov15 fairroot release
LOG(ERROR) << "Failed to queue ID part of event #" << currentTimeframeId;
} else {
if (fChannels.at("data-out").at(direction).SendAsync(fDataBuffer.front()) < 0) {
LOG(ERROR) << "Could not send message with event #" << currentTimeframeId << " without blocking";
}
}
fHeaderBuffer.pop();
fArrivalTime.pop();
Expand Down
7 changes: 2 additions & 5 deletions devices/flp2epn-distributed/FLPSender.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,15 @@ class FLPSender : public FairMQDevice
/// Sends the "oldest" element from the sub-timeframe container
void sendFrontData();

std::queue<FairMQMessage*> fHeaderBuffer; ///< Stores sub-timeframe headers
std::queue<FairMQMessage*> fDataBuffer; ///< Stores sub-timeframe bodies
std::queue<std::unique_ptr<FairMQMessage>> fHeaderBuffer; ///< Stores sub-timeframe headers
std::queue<std::unique_ptr<FairMQMessage>> fDataBuffer; ///< Stores sub-timeframe bodies
std::queue<boost::posix_time::ptime> fArrivalTime; ///< Stores arrival times of sub-timeframes

int fNumEPNs; ///< Number of epnReceivers
unsigned int fIndex; ///< Index of the flpSender among other flpSenders
unsigned int fSendOffset; ///< Offset for staggering output
unsigned int fSendDelay; ///< Delay for staggering output

int fSndMoreFlag; ///< Flag for faster access to multipart sending
int fNoBlockFlag; ///< Flag for faster access to sending without blocking

int fEventSize; ///< Size of the sub-timeframe body (only for test mode)
int fTestMode; ///< Run the device in test mode (only syncSampler+flpSender+epnReceiver)

Expand Down
Loading