diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index b0e4b43f70067..c6a376e45ee39 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -25,7 +25,10 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.ByteBufUtil; import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.OpenTelemetry; import java.time.Instant; @@ -102,10 +105,35 @@ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTele StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); registerListener(this); - this.childrenCache = Caffeine.newBuilder() + long childrenCacheMaxSizeBytes = getChildrenCacheMaxSizeBytes(); + + Caffeine childrenCacheBuilder = Caffeine.newBuilder() .recordStats() .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) - .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) + .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS); + if (childrenCacheMaxSizeBytes > 0) { + childrenCacheBuilder.maximumWeight(childrenCacheMaxSizeBytes) + .weigher((String key, List children) -> { + // calculate the total byte size of the key and entries in the children list + // to get some estimation of the required heap memory required for the entry. + // add 16 bytes overhead for Java object header and 16 bytes for java.lang.String fields. + int totalSize = ByteBufUtil.utf8Bytes(key) + 32; + for (String child : children) { + totalSize += ByteBufUtil.utf8Bytes(child) + 32; + } + return totalSize; + }); + } + this.childrenCache = childrenCacheBuilder + .evictionListener(new RemovalListener>() { + @Override + public void onRemoval(String key, List value, RemovalCause cause) { + if (cause == RemovalCause.SIZE) { + log.warn("[{}] Evicting path {} from children cache because the size of the cache is too " + + "large. Consider increasing the maximum heap size.", metadataStoreName, key); + } + } + }) .buildAsync(new AsyncCacheLoader>() { @Override public CompletableFuture> asyncLoad(String key, Executor executor) { @@ -152,6 +180,20 @@ public CompletableFuture asyncReload(String key, Boolean oldValue, this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry); } + /** + * Return the maximum size of the children cache in bytes. + * @return maximum size of the children cache in bytes. + */ + protected long getChildrenCacheMaxSizeBytes() { + long heapMaxSizeBytes = Runtime.getRuntime().maxMemory(); + // default 20% of max heap size, this should be sufficient to prevent OOME in the use case + // when a lot of namespaces with lots of topics are listed in the metadata store. + long defaultSizeBytes = heapMaxSizeBytes / 5; + // min size 20MB + int minSizeBytes = 1024 * 1024 * 20; + return Math.max(defaultSizeBytes, minSizeBytes); + } + @Override public CompletableFuture handleMetadataEvent(MetadataEvent event) { CompletableFuture result = new CompletableFuture<>();