diff --git a/Framework/Core/include/Framework/ComputingQuotaOffer.h b/Framework/Core/include/Framework/ComputingQuotaOffer.h index 9f42e751a33a5..8610d24ea718f 100644 --- a/Framework/Core/include/Framework/ComputingQuotaOffer.h +++ b/Framework/Core/include/Framework/ComputingQuotaOffer.h @@ -51,7 +51,8 @@ struct ComputingQuotaOffer { int user = -1; /// The score for the given offer OfferScore score = OfferScore::Unneeded; - /// Whether or not the offer is valid + /// Whether or not the offer is valid, invalid offers can + /// be reused whe we get some more quota from the system. bool valid = false; }; diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 0f5952daa92a9..848ab75b0e44c 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -58,6 +58,7 @@ struct MetricIndices { size_t arrowBytesDestroyed = 0; size_t arrowMessagesCreated = 0; size_t arrowMessagesDestroyed = 0; + size_t arrowBytesExpired = 0; }; std::vector createDefaultIndices(std::vector& allDevicesMetrics) @@ -70,6 +71,7 @@ std::vector createDefaultIndices(std::vector& indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-destroyed"); indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-created"); indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric(info, "arrow-messages-destroyed"); + indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric(info, "arrow-bytes-expired"); results.push_back(indices); } return results; @@ -110,6 +112,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() int64_t totalBytesCreated = 0; int64_t readerBytesCreated = 0; int64_t totalBytesDestroyed = 0; + int64_t totalBytesExpired = 0; int64_t totalMessagesCreated = 0; int64_t totalMessagesDestroyed = 0; static RateLimitingState currentState = RateLimitingState::UNKNOWN; @@ -120,6 +123,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "available-shared-memory"); static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "offered-shared-memory"); static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-destroyed"); + static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-expired"); static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-messages-created"); static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-messages-destroyed"); static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "arrow-bytes-delta"); @@ -171,6 +175,17 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() firstTimestamp = std::min(lastTimestamp, firstTimestamp); } } + { + size_t index = indices.arrowBytesExpired; + if (index < deviceMetrics.metrics.size()) { + hasMetrics = true; + changed |= deviceMetrics.changed.at(index); + MetricInfo info = deviceMetrics.metrics.at(index); + auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx); + totalBytesExpired += (int64_t)data.at((info.pos - 1) % data.size()); + firstTimestamp = std::min(lastTimestamp, firstTimestamp); + } + } { size_t index = indices.arrowMessagesCreated; if (index < deviceMetrics.metrics.size()) { @@ -191,10 +206,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (changed) { totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp); totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp); + totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp); readerBytesCreatedMetric(driverMetrics, readerBytesCreated, timestamp); totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp); totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp); - totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesDestroyed, timestamp); + totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp); } bool done = false; static int stateTransitions = 0; @@ -205,7 +221,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static RateLimitingState lastReportedState = RateLimitingState::UNKNOWN; static uint64_t lastReportTime = 0; static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry); - constexpr int64_t QUANTUM_SHARED_MEMORY = 500; + constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 300; + constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 100; static int64_t availableSharedMemory = MAX_SHARED_MEMORY; static int64_t offeredSharedMemory = 0; @@ -213,18 +230,33 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() /// We loop over the devices, starting from where we stopped last time /// offering 1 GB of shared memory to each reader. int64_t lastCandidate = -1; + static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0; + static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1; + int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY; for (size_t di = 0; di < specs.size(); di++) { - if (availableSharedMemory < QUANTUM_SHARED_MEMORY) { + if (availableSharedMemory < possibleOffer) { + if (lowSharedMemoryCount == 0) { + LOGP(ERROR, "We do not have enough shared memory ({}MB) to offer {}MB", availableSharedMemory, possibleOffer); + } + lowSharedMemoryCount++; + enoughSharedMemoryCount = 0; break; + } else { + if (enoughSharedMemoryCount == 0) { + LOGP(INFO, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory); + } + enoughSharedMemoryCount++; + lowSharedMemoryCount = 0; } size_t candidate = (lastDeviceOffered + di) % specs.size(); if (specs[candidate].name != "internal-dpl-aod-reader") { continue; } - LOGP(info, "Offering {}MB to {}", QUANTUM_SHARED_MEMORY, specs[candidate].id); - manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", QUANTUM_SHARED_MEMORY).data()); - availableSharedMemory -= QUANTUM_SHARED_MEMORY; - offeredSharedMemory += QUANTUM_SHARED_MEMORY; + possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory); + LOGP(info, "Offering {}MB to {}", possibleOffer, availableSharedMemory, specs[candidate].id); + manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); + availableSharedMemory -= possibleOffer; + offeredSharedMemory += possibleOffer; lastCandidate = candidate; } // We had at least a valid candidate, so @@ -234,7 +266,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } int unusedOfferedMemory = (offeredSharedMemory - readerBytesCreated / 1000000); - availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory; + availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed + totalBytesExpired - totalBytesCreated) / 1000000) - unusedOfferedMemory; availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp); unusedOfferedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp); @@ -279,7 +311,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() [](ServiceRegistry& registry, boost::program_options::variables_map const& vm) { auto config = new RateLimitConfig{}; int readers = std::stoll(vm["readers"].as()); - long long int minReaderMemory = readers * 1000; + long long int minReaderMemory = readers * 500; if (vm.count("aod-memory-rate-limit")) { config->maxMemory = std::max(minReaderMemory, std::stoll(vm["aod-memory-rate-limit"].as()) / 1000000); } diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index c875cbf45ebac..32b38612bde5b 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -44,6 +44,14 @@ ComputingQuotaEvaluator::ComputingQuotaEvaluator(ServiceRegistry& registry) 0}; } +struct QuotaEvaluatorStats { + std::vector invalidOffers; + std::vector otherUser; + std::vector unexpiring; + std::vector selectedOffers; + std::vector expired; +}; + bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& selector) { auto selectOffer = [&loop = mLoop, &offers = this->mOffers, &infos = this->mInfos, task](int ref) { @@ -57,6 +65,45 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& }; ComputingQuotaOffer accumulated; + static QuotaEvaluatorStats stats; + + stats.invalidOffers.clear(); + stats.otherUser.clear(); + stats.unexpiring.clear(); + stats.selectedOffers.clear(); + stats.expired.clear(); + + auto summarizeWhatHappended = [](bool enough, std::vector const& result, ComputingQuotaOffer const& totalOffer, QuotaEvaluatorStats& stats) -> bool { + if (result.size() == 1 && result[0] == 0) { + // LOG(INFO) << "No particular resource was requested, so we schedule task anyways"; + return enough; + } + if (enough) { + LOGP(INFO, "{} offers were selected for a total of: cpu {}, memory {}, shared memory {}", result.size(), totalOffer.cpu, totalOffer.memory, totalOffer.sharedMemory); + LOGP(INFO, " The following offers were selected for computation: {} ", fmt::join(result, ",")); + } else { + LOG(INFO) << "No offer was selected"; + if (result.size()) { + LOGP(INFO, " The following offers were selected for computation but not enough: {} ", fmt::join(result, ",")); + } + } + if (stats.invalidOffers.size()) { + LOGP(INFO, " The following offers were invalid: {}", fmt::join(stats.invalidOffers, ", ")); + } + if (stats.otherUser.size()) { + LOGP(INFO, " The following offers were owned by other users: {}", fmt::join(stats.otherUser, ", ")); + } + if (stats.expired.size()) { + LOGP(INFO, " The following offers are expired: {}", fmt::join(stats.expired, ", ")); + } + if (stats.unexpiring.size() > 1) { + LOGP(INFO, " The following offers will never expire: {}", fmt::join(stats.unexpiring, ", ")); + } + + return enough; + }; + + bool enough = false; for (int i = 0; i != mOffers.size(); ++i) { auto& offer = mOffers[i]; @@ -67,14 +114,22 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& // - Offers which belong to another task // - Expired offers if (offer.valid == false) { + stats.invalidOffers.push_back(i); continue; } if (offer.user != -1 && offer.user != task) { + stats.otherUser.push_back(i); continue; } - if (offer.runtime > 0 && offer.runtime + info.received < uv_now(mLoop)) { + if (offer.runtime < 0) { + stats.unexpiring.push_back(i); + } else if (offer.runtime + info.received < uv_now(mLoop)) { + LOGP(INFO, "Offer {} expired since {} milliseconds and holds {}MB", i, uv_now(mLoop) - offer.runtime - info.received, offer.sharedMemory / 1000000); mExpiredOffers.push_back(ComputingQuotaOfferRef{i}); + stats.expired.push_back(i); continue; + } else { + LOGP(INFO, "Offer {} still valid for {} milliseconds, providing {}MB", i, offer.runtime + info.received - uv_now(mLoop), offer.sharedMemory / 1000000); } /// We then check if the offer is suitable assert(offer.sharedMemory >= 0); @@ -85,18 +140,24 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& offer.score = selector(offer, tmp); switch (offer.score) { case OfferScore::Unneeded: + continue; case OfferScore::Unsuitable: continue; case OfferScore::More: selectOffer(i); - break; + accumulated = tmp; + stats.selectedOffers.push_back(i); + continue; case OfferScore::Enough: selectOffer(i); - return true; + accumulated = tmp; + stats.selectedOffers.push_back(i); + enough = true; + break; }; } // If we get here it means we never got enough offers, so we return false. - return false; + return summarizeWhatHappended(enough, stats.selectedOffers, accumulated, stats); } void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer) @@ -149,6 +210,16 @@ void ComputingQuotaEvaluator::updateOffers(std::vector& pen void ComputingQuotaEvaluator::handleExpired() { + static int nothingToDoCount = mExpiredOffers.size(); + if (mExpiredOffers.size()) { + LOGP(INFO, "Handling {} expired offers", mExpiredOffers.size()); + nothingToDoCount = 0; + } else { + if (nothingToDoCount == 0) { + nothingToDoCount++; + LOGP(INFO, "No expired offers"); + } + } using o2::monitoring::Metric; using o2::monitoring::Monitoring; using o2::monitoring::tags::Key; @@ -156,15 +227,29 @@ void ComputingQuotaEvaluator::handleExpired() auto& monitoring = mRegistry.get(); /// Whenever an offer is expired, we give back the resources /// to the driver. + static uint64_t expiredOffers = 0; + static uint64_t expiredBytes = 0; + for (auto& ref : mExpiredOffers) { auto& offer = mOffers[ref.index]; + if (offer.sharedMemory < 0) { + LOGP(INFO, "Offer {} does not have any more memory. Marking it as invalid.", ref.index); + offer.valid = false; + offer.score = OfferScore::Unneeded; + continue; + } // FIXME: offers should go through the driver client, not the monitoring // api. auto& monitoring = mRegistry.get(); - monitoring.send(o2::monitoring::Metric{(uint64_t)offer.sharedMemory, "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); - // LOGP(INFO, "Offer expired {} {}", offer.sharedMemory, offer.cpu); - // driverClient.tell("expired shmem {}", offer.sharedMemory); - // driverClient.tell("expired cpu {}", offer.cpu); + monitoring.send(o2::monitoring::Metric{expiredOffers++, "resource-offer-expired"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); + expiredBytes += offer.sharedMemory; + monitoring.send(o2::monitoring::Metric{(uint64_t)expiredBytes, "arrow-bytes-expired"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); + LOGP(INFO, "Offer {} expired. Giving back {}MB and {} cores", ref.index, offer.sharedMemory / 1000000, offer.cpu); + //driverClient.tell("expired shmem {}", offer.sharedMemory); + //driverClient.tell("expired cpu {}", offer.cpu); + offer.sharedMemory = -1; + offer.valid = false; + offer.score = OfferScore::Unneeded; } mExpiredOffers.clear(); } diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index d9939ab246c9b..b070fe7dde62a 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -151,6 +151,7 @@ void run_completion(uv_work_t* handle, int status) context.deviceContext->quotaEvaluator->consume(task->id.index, consumer); } context.deviceContext->state->offerConsumers.clear(); + context.deviceContext->quotaEvaluator->handleExpired(); context.deviceContext->quotaEvaluator->dispose(task->id.index); task->running = false; ZoneScopedN("run_completion"); @@ -575,6 +576,7 @@ bool DataProcessingDevice::ConditionalRun() run_completion(&handle, 0); #endif } else { + mDataProcessorContexes.at(0).deviceContext->quotaEvaluator->handleExpired(); mWasActive = false; } } else { diff --git a/Framework/Core/src/WSDriverClient.cxx b/Framework/Core/src/WSDriverClient.cxx index 1b48cc40ceed3..1d2dd7b6e39c1 100644 --- a/Framework/Core/src/WSDriverClient.cxx +++ b/Framework/Core/src/WSDriverClient.cxx @@ -104,7 +104,7 @@ void on_connect(uv_connect_t* connection, int status) offer.cpu = 0; offer.memory = 0; offer.sharedMemory = offerSize * 1000000; - offer.runtime = -1; + offer.runtime = 60000; offer.user = -1; offer.valid = true;