diff --git a/Framework/Core/include/Framework/ComputingQuotaEvaluator.h b/Framework/Core/include/Framework/ComputingQuotaEvaluator.h index 98835cdca194a..12c3cd005fbc9 100644 --- a/Framework/Core/include/Framework/ComputingQuotaEvaluator.h +++ b/Framework/Core/include/Framework/ComputingQuotaEvaluator.h @@ -50,6 +50,7 @@ class ComputingQuotaEvaluator std::array mInfos; ServiceRegistry& mRegistry; uv_loop_t* mLoop; + uint64_t mTotalDisposedSharedMemory = 0; }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/ComputingQuotaOffer.h b/Framework/Core/include/Framework/ComputingQuotaOffer.h index 8610d24ea718f..316e7da8c24d0 100644 --- a/Framework/Core/include/Framework/ComputingQuotaOffer.h +++ b/Framework/Core/include/Framework/ComputingQuotaOffer.h @@ -72,7 +72,7 @@ using ComputingQuotaRequest = std::function&)>; +using ComputingQuotaConsumer = std::function&, std::function)>; } // namespace o2::framework diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index b41a4d0bf0448..58880f33e6ee5 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -22,7 +22,8 @@ #include "CommonMessageBackendsHelpers.h" #include -#include +#include "Headers/DataHeader.h" +#include "Headers/DataHeaderHelpers.h" #include @@ -54,7 +55,7 @@ static int64_t memLimit = 0; struct MetricIndices { size_t arrowBytesCreated = 0; - size_t readerBytesCreated = 0; + size_t shmOfferConsumed = 0; size_t arrowBytesDestroyed = 0; size_t arrowMessagesCreated = 0; size_t arrowMessagesDestroyed = 0; @@ -110,7 +111,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() DeviceMetricsInfo& driverMetrics, size_t timestamp) { int64_t totalBytesCreated = 0; - int64_t readerBytesCreated = 0; + int64_t shmOfferConsumed = 0; int64_t totalBytesDestroyed = 0; int64_t totalBytesExpired = 0; int64_t totalMessagesCreated = 0; @@ -118,7 +119,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() static RateLimitingState currentState = RateLimitingState::UNKNOWN; static auto stateMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "rate-limit-state"); static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "total-arrow-bytes-created"); - static auto readerBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "reader-arrow-bytes-created"); + static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "shm-offer-bytes-consumed"); static auto unusedOfferedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "unusedOfferedMemory"); static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "available-shared-memory"); static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric(driverMetrics, "offered-shared-memory"); @@ -157,9 +158,19 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx); auto value = (int64_t)data.at((info.pos - 1) % data.size()); totalBytesCreated += value; - if (specs[mi].name == "internal-dpl-aod-reader") { - readerBytesCreated += value; - } + lastTimestamp = std::max(lastTimestamp, deviceMetrics.timestamps[index][(info.pos - 1) % data.size()]); + firstTimestamp = std::min(lastTimestamp, firstTimestamp); + } + } + { + size_t index = indices.shmOfferConsumed; + if (index < deviceMetrics.metrics.size()) { + hasMetrics = true; + changed |= deviceMetrics.changed.at(index); + MetricInfo info = deviceMetrics.metrics.at(index); + auto& data = deviceMetrics.uint64Metrics.at(info.storeIdx); + auto value = (int64_t)data.at((info.pos - 1) % data.size()); + shmOfferConsumed += value; lastTimestamp = std::max(lastTimestamp, deviceMetrics.timestamps[index][(info.pos - 1) % data.size()]); firstTimestamp = std::min(lastTimestamp, firstTimestamp); } @@ -207,7 +218,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp); totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp); totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp); - readerBytesCreatedMetric(driverMetrics, readerBytesCreated, timestamp); + shmOfferConsumedMetric(driverMetrics, shmOfferConsumed, timestamp); totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp); totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp); totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp); @@ -253,7 +264,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() continue; } possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory); - LOGP(info, "Offering {}MB to {}", possibleOffer, availableSharedMemory, specs[candidate].id); + LOGP(info, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id); manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data()); availableSharedMemory -= possibleOffer; offeredSharedMemory += possibleOffer; @@ -265,8 +276,24 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() lastDeviceOffered = lastCandidate + 1; } - int unusedOfferedMemory = (offeredSharedMemory - readerBytesCreated / 1000000); - availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed + totalBytesExpired - totalBytesCreated) / 1000000) - unusedOfferedMemory; + // unusedOfferedMemory is the amount of memory which was offered and which we know it was + // not used so far. So we need to account for the amount which got actually read (readerBytesCreated) + // and the amount which we know was given back. + static int64_t lastShmOfferConsumed = 0; + static int64_t lastUnusedOfferedMemory = 0; + if (shmOfferConsumed != lastShmOfferConsumed) { + LOGP(INFO, "Offer consumed so far {}", shmOfferConsumed); + lastShmOfferConsumed = shmOfferConsumed; + } + int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferConsumed) / 1000000); + if (lastUnusedOfferedMemory != unusedOfferedMemory) { + LOGP(INFO, "Unused offer {}", unusedOfferedMemory); + lastUnusedOfferedMemory = unusedOfferedMemory; + } + // availableSharedMemory is the amount of memory which we know is available to be offered. + // We subtract the amount which we know was already offered but it's unused and we then balance how + // much was created with how much was destroyed. + availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory; availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp); unusedOfferedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp); @@ -283,6 +310,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } auto dh = o2::header::get(input.header); if (dh->serialization != o2::header::gSerializationMethodArrow) { + LOGP(INFO, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription); continue; } auto dph = o2::header::get(input.header); @@ -294,12 +322,15 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } if (forwarded) { + LOGP(INFO, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription); continue; } + LOGP(INFO, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, dh->payloadSize / 1000000.); totalBytes += dh->payloadSize; totalMessages += 1; } arrow->updateBytesDestroyed(totalBytes); + LOGP(INFO, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.); arrow->updateMessagesDestroyed(totalMessages); auto& monitoring = ctx.services().get(); monitoring.send(Metric{(uint64_t)arrow->bytesDestroyed(), "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL)); diff --git a/Framework/Core/src/ComputingQuotaEvaluator.cxx b/Framework/Core/src/ComputingQuotaEvaluator.cxx index 32b38612bde5b..8b8e4e4a1f3b4 100644 --- a/Framework/Core/src/ComputingQuotaEvaluator.cxx +++ b/Framework/Core/src/ComputingQuotaEvaluator.cxx @@ -15,6 +15,8 @@ #include "Framework/DriverClient.h" #include "Framework/Monitoring.h" #include "Framework/Logger.h" +#include + #include #include #include @@ -162,7 +164,18 @@ bool ComputingQuotaEvaluator::selectOffer(int task, ComputingQuotaRequest const& void ComputingQuotaEvaluator::consume(int id, ComputingQuotaConsumer& consumer) { - consumer(id, mOffers); + using o2::monitoring::Metric; + using o2::monitoring::Monitoring; + using o2::monitoring::tags::Key; + using o2::monitoring::tags::Value; + // This will report how much of the offers has to be considered consumed. + // Notice that actual memory usage might be larger, because we can over + // allocate. + auto reportConsumedOffer = [&totalDisposedMemory = mTotalDisposedSharedMemory, &monitoring = mRegistry.get()](ComputingQuotaOffer const& accumulatedConsumed) { + totalDisposedMemory += accumulatedConsumed.sharedMemory; + monitoring.send(Metric{(uint64_t)totalDisposedMemory, "shm-offer-consumed"}.addTag(Key::Subsystem, Value::DPL)); + }; + consumer(id, mOffers, reportConsumedOffer); } void ComputingQuotaEvaluator::dispose(int taskId) diff --git a/Framework/Core/src/DataProcessor.cxx b/Framework/Core/src/DataProcessor.cxx index b4cf0492e614c..ece774e34f540 100644 --- a/Framework/Core/src/DataProcessor.cxx +++ b/Framework/Core/src/DataProcessor.cxx @@ -18,6 +18,7 @@ #include "FairMQResizableBuffer.h" #include "CommonUtils/BoostSerializer.h" #include "Headers/DataHeader.h" +#include "Headers/DataHeaderHelpers.h" #include #include @@ -98,6 +99,7 @@ void DataProcessor::doSend(FairMQDevice& device, ArrowContext& context, ServiceR dh->payloadSize = payload->GetSize(); dh->serialization = o2::header::gSerializationMethodArrow; monitoring.send(Metric{(uint64_t)payload->GetSize(), fmt::format("table-bytes-{}-{}-created", dh->dataOrigin.as(), dh->dataDescription.as())}.addTag(Key::Subsystem, Value::DPL)); + LOGP(INFO, "Creating {}MB for table {}/{}.", payload->GetSize() / 1000000., dh->dataOrigin, dh->dataDescription); context.updateBytesSent(payload->GetSize()); context.updateMessagesSent(1); parts.AddPart(std::move(messageRef.header)); @@ -105,7 +107,9 @@ void DataProcessor::doSend(FairMQDevice& device, ArrowContext& context, ServiceR device.Send(parts, messageRef.channel, 0); } static int64_t previousBytesSent = 0; - auto disposeResources = [bs = context.bytesSent() - previousBytesSent](int taskId, std::array& offers) { + auto disposeResources = [bs = context.bytesSent() - previousBytesSent](int taskId, std::array& offers, std::function accountDisposed) { + ComputingQuotaOffer disposed; + disposed.sharedMemory = 0; int64_t bytesSent = bs; for (size_t oi = 0; oi < offers.size(); oi++) { auto& offer = offers[oi]; @@ -116,9 +120,10 @@ void DataProcessor::doSend(FairMQDevice& device, ArrowContext& context, ServiceR offer.sharedMemory -= toRemove; bytesSent -= toRemove; if (bytesSent <= 0) { - return; + break; } } + return accountDisposed(disposed); }; registry.get().offerConsumers.push_back(disposeResources); previousBytesSent = context.bytesSent();