Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ComputingQuotaEvaluator
std::array<ComputingQuotaInfo, MAX_INFLIGHT_OFFERS> mInfos;
ServiceRegistry& mRegistry;
uv_loop_t* mLoop;
uint64_t mTotalDisposedSharedMemory = 0;
};

} // namespace o2::framework
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/ComputingQuotaOffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ using ComputingQuotaRequest = std::function<OfferScore(ComputingQuotaOffer const

/// A consumer is a function which updates a given function removing the
/// amount of resources which are considered as consumed.
using ComputingQuotaConsumer = std::function<void(int id, std::array<ComputingQuotaOffer, 16>&)>;
using ComputingQuotaConsumer = std::function<void(int id, std::array<ComputingQuotaOffer, 16>&, std::function<void(ComputingQuotaOffer const&)>)>;

} // namespace o2::framework

Expand Down
53 changes: 42 additions & 11 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

#include "CommonMessageBackendsHelpers.h"
#include <Monitoring/Monitoring.h>
#include <Headers/DataHeader.h>
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"

#include <options/FairMQProgOptions.h>

Expand Down Expand Up @@ -54,7 +55,7 @@ static int64_t memLimit = 0;

struct MetricIndices {
size_t arrowBytesCreated = 0;
size_t readerBytesCreated = 0;
size_t shmOfferConsumed = 0;
size_t arrowBytesDestroyed = 0;
size_t arrowMessagesCreated = 0;
size_t arrowMessagesDestroyed = 0;
Expand Down Expand Up @@ -110,15 +111,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
DeviceMetricsInfo& driverMetrics,
size_t timestamp) {
int64_t totalBytesCreated = 0;
int64_t readerBytesCreated = 0;
int64_t shmOfferConsumed = 0;
int64_t totalBytesDestroyed = 0;
int64_t totalBytesExpired = 0;
int64_t totalMessagesCreated = 0;
int64_t totalMessagesDestroyed = 0;
static RateLimitingState currentState = RateLimitingState::UNKNOWN;
static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
static auto readerBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "reader-arrow-bytes-created");
static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "shm-offer-bytes-consumed");
static auto unusedOfferedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "unusedOfferedMemory");
static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "available-shared-memory");
static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "offered-shared-memory");
Expand Down Expand Up @@ -157,9 +158,19 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx);
auto value = (int64_t)data.at((info.pos - 1) % data.size());
totalBytesCreated += value;
if (specs[mi].name == "internal-dpl-aod-reader") {
readerBytesCreated += value;
}
lastTimestamp = std::max(lastTimestamp, deviceMetrics.timestamps[index][(info.pos - 1) % data.size()]);
firstTimestamp = std::min(lastTimestamp, firstTimestamp);
}
}
{
size_t index = indices.shmOfferConsumed;
if (index < deviceMetrics.metrics.size()) {
hasMetrics = true;
changed |= deviceMetrics.changed.at(index);
MetricInfo info = deviceMetrics.metrics.at(index);
auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx);
auto value = (int64_t)data.at((info.pos - 1) % data.size());
shmOfferConsumed += value;
lastTimestamp = std::max(lastTimestamp, deviceMetrics.timestamps[index][(info.pos - 1) % data.size()]);
firstTimestamp = std::min(lastTimestamp, firstTimestamp);
}
Expand Down Expand Up @@ -207,7 +218,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
readerBytesCreatedMetric(driverMetrics, readerBytesCreated, timestamp);
shmOfferConsumedMetric(driverMetrics, shmOfferConsumed, timestamp);
totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
Expand Down Expand Up @@ -253,7 +264,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
continue;
}
possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
LOGP(info, "Offering {}MB to {}", possibleOffer, availableSharedMemory, specs[candidate].id);
LOGP(info, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id);
manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
availableSharedMemory -= possibleOffer;
offeredSharedMemory += possibleOffer;
Expand All @@ -265,8 +276,24 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
lastDeviceOffered = lastCandidate + 1;
}

int unusedOfferedMemory = (offeredSharedMemory - readerBytesCreated / 1000000);
availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed + totalBytesExpired - totalBytesCreated) / 1000000) - unusedOfferedMemory;
// unusedOfferedMemory is the amount of memory which was offered and which we know it was
// not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
// and the amount which we know was given back.
static int64_t lastShmOfferConsumed = 0;
static int64_t lastUnusedOfferedMemory = 0;
if (shmOfferConsumed != lastShmOfferConsumed) {
LOGP(INFO, "Offer consumed so far {}", shmOfferConsumed);
lastShmOfferConsumed = shmOfferConsumed;
}
int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferConsumed) / 1000000);
if (lastUnusedOfferedMemory != unusedOfferedMemory) {
LOGP(INFO, "Unused offer {}", unusedOfferedMemory);
lastUnusedOfferedMemory = unusedOfferedMemory;
}
// availableSharedMemory is the amount of memory which we know is available to be offered.
// We subtract the amount which we know was already offered but it's unused and we then balance how
// much was created with how much was destroyed.
availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
unusedOfferedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);

Expand All @@ -283,6 +310,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}
auto dh = o2::header::get<DataHeader*>(input.header);
if (dh->serialization != o2::header::gSerializationMethodArrow) {
LOGP(INFO, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription);
continue;
}
auto dph = o2::header::get<DataProcessingHeader*>(input.header);
Expand All @@ -294,12 +322,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}
}
if (forwarded) {
LOGP(INFO, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription);
continue;
}
LOGP(INFO, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, dh->payloadSize / 1000000.);
totalBytes += dh->payloadSize;
totalMessages += 1;
}
arrow->updateBytesDestroyed(totalBytes);
LOGP(INFO, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
arrow->updateMessagesDestroyed(totalMessages);
auto& monitoring = ctx.services().get<Monitoring>();
monitoring.send(Metric{(uint64_t)arrow->bytesDestroyed(), "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
Expand Down
15 changes: 14 additions & 1 deletion Framework/Core/src/ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "Framework/DriverClient.h"
#include "Framework/Monitoring.h"
#include "Framework/Logger.h"
#include <Monitoring/Monitoring.h>

#include <vector>
#include <uv.h>
#include <cassert>
Expand Down Expand Up @@ -162,7 +164,18 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&

void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer)
{
consumer(id, mOffers);
using o2::monitoring::Metric;
using o2::monitoring::Monitoring;
using o2::monitoring::tags::Key;
using o2::monitoring::tags::Value;
// This will report how much of the offers has to be considered consumed.
// Notice that actual memory usage might be larger, because we can over
// allocate.
auto reportConsumedOffer = [&totalDisposedMemory = mTotalDisposedSharedMemory, &monitoring = mRegistry.get<Monitoring>()](ComputingQuotaOffer const& accumulatedConsumed) {
totalDisposedMemory += accumulatedConsumed.sharedMemory;
monitoring.send(Metric{(uint64_t)totalDisposedMemory, "shm-offer-consumed"}.addTag(Key::Subsystem, Value::DPL));
};
consumer(id, mOffers, reportConsumedOffer);
}

void ComputingQuotaEvaluator::dispose(int taskId)
Expand Down
9 changes: 7 additions & 2 deletions Framework/Core/src/DataProcessor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "FairMQResizableBuffer.h"
#include "CommonUtils/BoostSerializer.h"
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"

#include <Monitoring/Monitoring.h>
#include <fairmq/FairMQParts.h>
Expand Down Expand Up @@ -98,14 +99,17 @@ void DataProcessor::doSend(FairMQDevice& device, ArrowContext& context, ServiceR
dh->payloadSize = payload->GetSize();
dh->serialization = o2::header::gSerializationMethodArrow;
monitoring.send(Metric{(uint64_t)payload->GetSize(), fmt::format("table-bytes-{}-{}-created", dh->dataOrigin.as<std::string>(), dh->dataDescription.as<std::string>())}.addTag(Key::Subsystem, Value::DPL));
LOGP(INFO, "Creating {}MB for table {}/{}.", payload->GetSize() / 1000000., dh->dataOrigin, dh->dataDescription);
context.updateBytesSent(payload->GetSize());
context.updateMessagesSent(1);
parts.AddPart(std::move(messageRef.header));
parts.AddPart(std::move(payload));
device.Send(parts, messageRef.channel, 0);
}
static int64_t previousBytesSent = 0;
auto disposeResources = [bs = context.bytesSent() - previousBytesSent](int taskId, std::array<ComputingQuotaOffer, 16>& offers) {
auto disposeResources = [bs = context.bytesSent() - previousBytesSent](int taskId, std::array<ComputingQuotaOffer, 16>& offers, std::function<void(ComputingQuotaOffer&)> accountDisposed) {
ComputingQuotaOffer disposed;
disposed.sharedMemory = 0;
int64_t bytesSent = bs;
for (size_t oi = 0; oi < offers.size(); oi++) {
auto& offer = offers[oi];
Expand All @@ -116,9 +120,10 @@ void DataProcessor::doSend(FairMQDevice& device, ArrowContext& context, ServiceR
offer.sharedMemory -= toRemove;
bytesSent -= toRemove;
if (bytesSent <= 0) {
return;
break;
}
}
return accountDisposed(disposed);
};
registry.get<DeviceState>().offerConsumers.push_back(disposeResources);
previousBytesSent = context.bytesSent();
Expand Down