Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Framework/Core/include/Framework/ComputingQuotaOffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ struct ComputingQuotaOffer {
int user = -1;
/// The score for the given offer
OfferScore score = OfferScore::Unneeded;
/// Whether or not the offer is valid
/// Whether or not the offer is valid, invalid offers can
/// be reused whe we get some more quota from the system.
bool valid = false;
};

Expand Down
50 changes: 41 additions & 9 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct MetricIndices {
size_t arrowBytesDestroyed = 0;
size_t arrowMessagesCreated = 0;
size_t arrowMessagesDestroyed = 0;
size_t arrowBytesExpired = 0;
};

std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
Expand All @@ -70,6 +71,7 @@ std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>&
indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
results.push_back(indices);
}
return results;
Expand Down Expand Up @@ -110,6 +112,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
int64_t totalBytesCreated = 0;
int64_t readerBytesCreated = 0;
int64_t totalBytesDestroyed = 0;
int64_t totalBytesExpired = 0;
int64_t totalMessagesCreated = 0;
int64_t totalMessagesDestroyed = 0;
static RateLimitingState currentState = RateLimitingState::UNKNOWN;
Expand All @@ -120,6 +123,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "available-shared-memory");
static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "offered-shared-memory");
static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
Expand Down Expand Up @@ -171,6 +175,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
firstTimestamp = std::min(lastTimestamp, firstTimestamp);
}
}
{
size_t index = indices.arrowBytesExpired;
if (index < deviceMetrics.metrics.size()) {
hasMetrics = true;
changed |= deviceMetrics.changed.at(index);
MetricInfo info = deviceMetrics.metrics.at(index);
auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx);
totalBytesExpired += (int64_t)data.at((info.pos - 1) % data.size());
firstTimestamp = std::min(lastTimestamp, firstTimestamp);
}
}
{
size_t index = indices.arrowMessagesCreated;
if (index < deviceMetrics.metrics.size()) {
Expand All @@ -191,10 +206,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
if (changed) {
totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
readerBytesCreatedMetric(driverMetrics, readerBytesCreated, timestamp);
totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesDestroyed, timestamp);
totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
}
bool done = false;
static int stateTransitions = 0;
Expand All @@ -205,26 +221,42 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
static RateLimitingState lastReportedState = RateLimitingState::UNKNOWN;
static uint64_t lastReportTime = 0;
static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry);
constexpr int64_t QUANTUM_SHARED_MEMORY = 500;
constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 300;
constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 100;

static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
static int64_t offeredSharedMemory = 0;
static int64_t lastDeviceOffered = 0;
/// We loop over the devices, starting from where we stopped last time
/// offering 1 GB of shared memory to each reader.
int64_t lastCandidate = -1;
static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
for (size_t di = 0; di < specs.size(); di++) {
if (availableSharedMemory < QUANTUM_SHARED_MEMORY) {
if (availableSharedMemory < possibleOffer) {
if (lowSharedMemoryCount == 0) {
LOGP(ERROR, "We do not have enough shared memory ({}MB) to offer {}MB", availableSharedMemory, possibleOffer);
}
lowSharedMemoryCount++;
enoughSharedMemoryCount = 0;
break;
} else {
if (enoughSharedMemoryCount == 0) {
LOGP(INFO, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory);
}
enoughSharedMemoryCount++;
lowSharedMemoryCount = 0;
}
size_t candidate = (lastDeviceOffered + di) % specs.size();
if (specs[candidate].name != "internal-dpl-aod-reader") {
continue;
}
LOGP(info, "Offering {}MB to {}", QUANTUM_SHARED_MEMORY, specs[candidate].id);
manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", QUANTUM_SHARED_MEMORY).data());
availableSharedMemory -= QUANTUM_SHARED_MEMORY;
offeredSharedMemory += QUANTUM_SHARED_MEMORY;
possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
LOGP(info, "Offering {}MB to {}", possibleOffer, availableSharedMemory, specs[candidate].id);
manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
availableSharedMemory -= possibleOffer;
offeredSharedMemory += possibleOffer;
lastCandidate = candidate;
}
// We had at least a valid candidate, so
Expand All @@ -234,7 +266,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
}

int unusedOfferedMemory = (offeredSharedMemory - readerBytesCreated / 1000000);
availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed + totalBytesExpired - totalBytesCreated) / 1000000) - unusedOfferedMemory;
availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
unusedOfferedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);

Expand Down Expand Up @@ -279,7 +311,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
[](ServiceRegistry& registry, boost::program_options::variables_map const& vm) {
auto config = new RateLimitConfig{};
int readers = std::stoll(vm["readers"].as<std::string>());
long long int minReaderMemory = readers * 1000;
long long int minReaderMemory = readers * 500;
if (vm.count("aod-memory-rate-limit")) {
config->maxMemory = std::max(minReaderMemory, std::stoll(vm["aod-memory-rate-limit"].as<std::string>()) / 1000000);
}
Expand Down
101 changes: 93 additions & 8 deletions Framework/Core/src/ComputingQuotaEvaluator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ ComputingQuotaEvaluator::ComputingQuotaEvaluator(ServiceRegistry& registry)
0};
}

struct QuotaEvaluatorStats {
std::vector<int> invalidOffers;
std::vector<int> otherUser;
std::vector<int> unexpiring;
std::vector<int> selectedOffers;
std::vector<int> expired;
};

bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector)
{
auto selectOffer = [&loop = mLoop, &offers = this->mOffers, &infos = this->mInfos, task](int ref) {
Expand All @@ -57,6 +65,45 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
};

ComputingQuotaOffer accumulated;
static QuotaEvaluatorStats stats;

stats.invalidOffers.clear();
stats.otherUser.clear();
stats.unexpiring.clear();
stats.selectedOffers.clear();
stats.expired.clear();

auto summarizeWhatHappended = [](bool enough, std::vector<int> const& result, ComputingQuotaOffer const& totalOffer, QuotaEvaluatorStats& stats) -> bool {
if (result.size() == 1 && result[0] == 0) {
// LOG(INFO) << "No particular resource was requested, so we schedule task anyways";
return enough;
}
if (enough) {
LOGP(INFO, "{} offers were selected for a total of: cpu {}, memory {}, shared memory {}", result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory);
LOGP(INFO, " The following offers were selected for computation: {} ", fmt::join(result, ","));
} else {
LOG(INFO) << "No offer was selected";
if (result.size()) {
LOGP(INFO, " The following offers were selected for computation but not enough: {} ", fmt::join(result, ","));
}
}
if (stats.invalidOffers.size()) {
LOGP(INFO, " The following offers were invalid: {}", fmt::join(stats.invalidOffers, ", "));
}
if (stats.otherUser.size()) {
LOGP(INFO, " The following offers were owned by other users: {}", fmt::join(stats.otherUser, ", "));
}
if (stats.expired.size()) {
LOGP(INFO, " The following offers are expired: {}", fmt::join(stats.expired, ", "));
}
if (stats.unexpiring.size() > 1) {
LOGP(INFO, " The following offers will never expire: {}", fmt::join(stats.unexpiring, ", "));
}

return enough;
};

bool enough = false;

for (int i = 0; i != mOffers.size(); ++i) {
auto& offer = mOffers[i];
Expand All @@ -67,14 +114,22 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
// - Offers which belong to another task
// - Expired offers
if (offer.valid == false) {
stats.invalidOffers.push_back(i);
continue;
}
if (offer.user != -1 && offer.user != task) {
stats.otherUser.push_back(i);
continue;
}
if (offer.runtime > 0 && offer.runtime + info.received < uv_now(mLoop)) {
if (offer.runtime < 0) {
stats.unexpiring.push_back(i);
} else if (offer.runtime + info.received < uv_now(mLoop)) {
LOGP(INFO, "Offer {} expired since {} milliseconds and holds {}MB", i, uv_now(mLoop) - offer.runtime - info.received, offer.sharedMemory / 1000000);
mExpiredOffers.push_back(ComputingQuotaOfferRef{i});
stats.expired.push_back(i);
continue;
} else {
LOGP(INFO, "Offer {} still valid for {} milliseconds, providing {}MB", i, offer.runtime + info.received - uv_now(mLoop), offer.sharedMemory / 1000000);
}
/// We then check if the offer is suitable
assert(offer.sharedMemory >= 0);
Expand All @@ -85,18 +140,24 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const&
offer.score = selector(offer, tmp);
switch (offer.score) {
case OfferScore::Unneeded:
continue;
case OfferScore::Unsuitable:
continue;
case OfferScore::More:
selectOffer(i);
break;
accumulated = tmp;
stats.selectedOffers.push_back(i);
continue;
case OfferScore::Enough:
selectOffer(i);
return true;
accumulated = tmp;
stats.selectedOffers.push_back(i);
enough = true;
break;
};
}
// If we get here it means we never got enough offers, so we return false.
return false;
return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats);
}

void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer)
Expand Down Expand Up @@ -149,22 +210,46 @@ void ComputingQuotaEvaluator::updateOffers(std::vector<ComputingQuotaOffer>& pen

void ComputingQuotaEvaluator::handleExpired()
{
static int nothingToDoCount = mExpiredOffers.size();
if (mExpiredOffers.size()) {
LOGP(INFO, "Handling {} expired offers", mExpiredOffers.size());
nothingToDoCount = 0;
} else {
if (nothingToDoCount == 0) {
nothingToDoCount++;
LOGP(INFO, "No expired offers");
}
}
using o2::monitoring::Metric;
using o2::monitoring::Monitoring;
using o2::monitoring::tags::Key;
using o2::monitoring::tags::Value;
auto& monitoring = mRegistry.get<o2::monitoring::Monitoring>();
/// Whenever an offer is expired, we give back the resources
/// to the driver.
static uint64_t expiredOffers = 0;
static uint64_t expiredBytes = 0;

for (auto& ref : mExpiredOffers) {
auto& offer = mOffers[ref.index];
if (offer.sharedMemory < 0) {
LOGP(INFO, "Offer {} does not have any more memory. Marking it as invalid.", ref.index);
offer.valid = false;
offer.score = OfferScore::Unneeded;
continue;
}
// FIXME: offers should go through the driver client, not the monitoring
// api.
auto& monitoring = mRegistry.get<o2::monitoring::Monitoring>();
monitoring.send(o2::monitoring::Metric{(uint64_t)offer.sharedMemory, "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
// LOGP(INFO, "Offer expired {} {}", offer.sharedMemory, offer.cpu);
// driverClient.tell("expired shmem {}", offer.sharedMemory);
// driverClient.tell("expired cpu {}", offer.cpu);
monitoring.send(o2::monitoring::Metric{expiredOffers++, "resource-offer-expired"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
expiredBytes += offer.sharedMemory;
monitoring.send(o2::monitoring::Metric{(uint64_t)expiredBytes, "arrow-bytes-expired"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
LOGP(INFO, "Offer {} expired. Giving back {}MB and {} cores", ref.index, offer.sharedMemory / 1000000, offer.cpu);
//driverClient.tell("expired shmem {}", offer.sharedMemory);
//driverClient.tell("expired cpu {}", offer.cpu);
offer.sharedMemory = -1;
offer.valid = false;
offer.score = OfferScore::Unneeded;
}
mExpiredOffers.clear();
}
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ void run_completion(uv_work_t* handle, int status)
context.deviceContext->quotaEvaluator->consume(task->id.index, consumer);
}
context.deviceContext->state->offerConsumers.clear();
context.deviceContext->quotaEvaluator->handleExpired();
context.deviceContext->quotaEvaluator->dispose(task->id.index);
task->running = false;
ZoneScopedN("run_completion");
Expand Down Expand Up @@ -575,6 +576,7 @@ bool DataProcessingDevice::ConditionalRun()
run_completion(&handle, 0);
#endif
} else {
mDataProcessorContexes.at(0).deviceContext->quotaEvaluator->handleExpired();
mWasActive = false;
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void on_connect(uv_connect_t* connection, int status)
offer.cpu = 0;
offer.memory = 0;
offer.sharedMemory = offerSize * 1000000;
offer.runtime = -1;
offer.runtime = 60000;
offer.user = -1;
offer.valid = true;

Expand Down