From 23a4043c1e0008a09f98b2d4a8e3d0e62eccf23a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 26 Jan 2026 13:01:35 +0800 Subject: [PATCH 01/15] [improve][meta] PIP-453: Improve the metadata store threading model 1. Trigger callbacks in the underlying metadata store's internal thread directly. 2. Add a dedicated scheduler for batch flushing tasks 3. Add a dedicated thread pool for `MetadataCache`'s computing tasks 4. Add tests to verify serialization and deserialization tasks are executed in the pool whose thread count is specified by `metadataStoreSerDesThreads` --- .../pulsar/broker/ServiceConfiguration.java | 6 ++ .../apache/pulsar/broker/PulsarService.java | 2 + .../pulsar/broker/PulsarServiceTest.java | 59 ++++++++++++ .../OpenTelemetryMetadataStoreStatsTest.java | 12 --- .../metadata/api/MetadataStoreConfig.java | 3 + .../cache/impl/MetadataCacheImpl.java | 92 +++++++++---------- .../metadata/impl/AbstractMetadataStore.java | 69 +++++++------- .../metadata/impl/EtcdMetadataStore.java | 19 ++-- .../impl/LocalMemoryMetadataStore.java | 2 +- .../metadata/impl/RocksdbMetadataStore.java | 2 +- .../pulsar/metadata/impl/ZKMetadataStore.java | 19 ++-- .../AbstractBatchedMetadataStore.java | 38 +++++--- .../metadata/impl/oxia/OxiaMetadataStore.java | 4 +- .../impl/stats/BatchMetadataStoreStats.java | 43 +-------- .../impl/MetadataStoreFactoryImplTest.java | 2 +- 15 files changed, 198 insertions(+), 174 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e5b7f0e458d23..555b525e8952d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -492,6 +492,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean metadataStoreAllowReadOnlyOperations; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The number of threads uses for serializing and deserializing data to and from the metadata store" + ) + private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors(); + @Deprecated @FieldContext( category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d698a568d9f4a..aa62de33c7509 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -434,6 +434,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro .synchronizer(synchronizer) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } @@ -1328,6 +1329,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 6c04889d8f1ba..8c25f01c3cd68 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -25,10 +25,15 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertSame; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -38,6 +43,10 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.metadata.api.MetadataCacheConfig; +import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.Stat; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -339,4 +348,54 @@ public void testShutdownViaAdminApi() throws Exception { assertTrue(e instanceof PulsarClientException.TimeoutException); } } + + @Test + public void testMetadataSerDesThreads() throws Exception { + final var numSerDesThreads = 5; + conf.setMetadataStoreSerDesThreads(numSerDesThreads); + setup(); + + BiConsumer verifier = (store, prefix) -> { + final var serDes = new CustomMetadataSerDes(); + final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build()); + for (int i = 0; i < 1000 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { + cache.create(prefix + i, "value-" + i).join(); + final var value = cache.get(prefix + i).join(); + assertEquals(value.orElseThrow(), "value-" + i); + final var newValue = cache.readModifyUpdate(prefix + i, s -> s + "-updated").join(); + assertEquals(newValue, "value-" + i + "-updated"); + // Verify the serialization and deserialization are handled by the same thread + assertEquals(serDes.threadNameToSerializedPaths, serDes.threadNameToDeserializedPaths); + } + log.info("SerDes thread mapping: {}", serDes.threadNameToSerializedPaths); + assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), numSerDesThreads); + // Verify a path cannot be handled by multiple threads + final var paths = serDes.threadNameToSerializedPaths.values().stream() + .flatMap(Set::stream).sorted().toList(); + assertEquals(paths.stream().distinct().toList(), paths); + }; + + verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/"); + verifier.accept(pulsar.getConfigurationMetadataStore(), "/test-config/"); + } + + private static class CustomMetadataSerDes implements MetadataSerde { + + final Map> threadNameToSerializedPaths = new ConcurrentHashMap<>(); + final Map> threadNameToDeserializedPaths = new ConcurrentHashMap<>(); + + @Override + public byte[] serialize(String path, String value) throws IOException{ + threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return value.getBytes(); + } + + @Override + public String deserialize(String path, byte[] data, Stat stat) throws IOException { + threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return new String(data); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java index 9e8bde20b88e7..390aa1e49e29d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java @@ -21,7 +21,6 @@ import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import java.util.concurrent.ExecutorService; import lombok.Cleanup; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.BrokerTestUtil; @@ -29,7 +28,6 @@ import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -53,14 +51,6 @@ protected void setup() throws Exception { var newStats = new MetadataStoreStats( localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true); - - var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, - "batchMetadataStoreStats", true); - currentBatchedStats.close(); - var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true); - var newBatchedStats = new BatchMetadataStoreStats(localMetadataStoreName, currentExecutor, - pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); - FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true); } @AfterMethod(alwaysRun = true) @@ -89,7 +79,5 @@ public void testMetadataStoreStats() throws Exception { var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME, attributes, value -> assertThat(value).isPositive()); - assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes, - value -> assertThat(value).isPositive()); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index ef50dc87691d0..66f1f046059d8 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -104,4 +104,7 @@ public class MetadataStoreConfig { * The estimator to estimate the payload length of metadata node, which used to limit the batch size requested. */ private MetadataNodeSizeStats nodeSizeStats; + + @Builder.Default + private final int numSerDesThreads = Runtime.getRuntime().availableProcessors(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b1f0572547ca7..50d802850941d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -30,7 +30,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -39,6 +38,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; @@ -62,23 +62,23 @@ public class MetadataCacheImpl implements MetadataCache, Consumer serde; - private final ScheduledExecutorService executor; + private final OrderedExecutor executor; private final MetadataCacheConfig cacheConfig; private final AsyncLoadingCache>> objCache; public MetadataCacheImpl(String cacheName, MetadataStore store, TypeReference typeRef, - MetadataCacheConfig cacheConfig, ScheduledExecutorService executor) { + MetadataCacheConfig cacheConfig, OrderedExecutor executor) { this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); } public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig, - ScheduledExecutorService executor) { + OrderedExecutor executor) { this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); } public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde serde, - MetadataCacheConfig cacheConfig, ScheduledExecutorService executor) { + MetadataCacheConfig cacheConfig, OrderedExecutor executor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -129,7 +129,7 @@ public CompletableFuture>> asyncReload( private CompletableFuture>> readValueFromStore(String path) { return store.get(path) - .thenCompose(optRes -> { + .thenComposeAsync(optRes -> { if (!optRes.isPresent()) { return FutureUtils.value(Optional.empty()); } @@ -143,7 +143,7 @@ private CompletableFuture>> readValueFromStore(String return FutureUtils.exception(new ContentDeserializationException( "Failed to deserialize payload for key '" + path + "'", t)); } - }); + }, executor.chooseThread(path)); } @Override @@ -169,8 +169,9 @@ public Optional getIfCached(String path) { @Override public CompletableFuture readModifyUpdateOrCreate(String path, Function, T> modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { Optional currentValue; long expectedVersion; @@ -202,13 +203,14 @@ public CompletableFuture readModifyUpdateOrCreate(String path, Function { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); } @Override public CompletableFuture readModifyUpdate(String path, Function modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { if (!optEntry.isPresent()) { return FutureUtils.exception(new NotFoundException("")); } @@ -231,59 +233,51 @@ public CompletableFuture readModifyUpdate(String path, Function modifyF return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); + } + + private CompletableFuture serialize(String path, T value) { + final var future = new CompletableFuture(); + executor.executeOrdered(path, () -> { + try { + future.complete(serde.serialize(path, value)); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + return future; } @Override public CompletableFuture create(String path, T value) { - byte[] content; - try { - content = serde.serialize(path, value); - } catch (Throwable t) { - return FutureUtils.exception(t); - } - - CompletableFuture future = new CompletableFuture<>(); - store.put(path, content, Optional.of(-1L)) - .thenAccept(stat -> { + return serialize(path, value).thenCompose(content -> store.put(path, content, Optional.of(-1L))) + .thenApply(stat -> { // Make sure we have the value cached before the operation is completed // In addition to caching the value, we need to add a watch on the path, // so when/if it changes on any other node, we are notified and we can // update the cache - objCache.get(path).whenComplete((stat2, ex) -> { - if (ex == null) { - future.complete(null); - } else { - log.error("Exception while getting path {}", path, ex); - future.completeExceptionally(ex.getCause()); - } - }); - }).exceptionally(ex -> { + return objCache.get(path); + }) + .exceptionallyCompose(ex -> { if (ex.getCause() instanceof BadVersionException) { // Use already exists exception to provide more self-explanatory error message - future.completeExceptionally(new AlreadyExistsException(ex.getCause())); + return CompletableFuture.failedFuture(new AlreadyExistsException(ex.getCause())); } else { - future.completeExceptionally(ex.getCause()); + return CompletableFuture.failedFuture(ex.getCause()); } - return null; - }); - - return future; + }) + .thenApply(__ -> null); } @Override public CompletableFuture put(String path, T value, EnumSet options) { - final byte[] bytes; - try { - bytes = serde.serialize(path, value); - } catch (IOException e) { - return CompletableFuture.failedFuture(e); - } - if (storeExtended != null) { - return storeExtended.put(path, bytes, Optional.empty(), options).thenAccept(__ -> refresh(path)); - } else { - return store.put(path, bytes, Optional.empty()).thenAccept(__ -> refresh(path)); - } + return serialize(path, value).thenCompose(bytes -> { + if (storeExtended != null) { + return storeExtended.put(path, bytes, Optional.empty(), options); + } else { + return store.put(path, bytes, Optional.empty()); + } + }).thenAccept(__ -> refresh(path)); } @Override @@ -354,8 +348,8 @@ private void execute(Supplier> op, String key, CompletableF final var next = backoff.next(); log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key, next, backoff.isMandatoryStopMade(), elapsed); - executor.schedule(() -> execute(op, key, result, backoff), next, - TimeUnit.MILLISECONDS); + CompletableFuture.delayedExecutor(next, TimeUnit.MILLISECONDS).execute(() -> + execute(op, key, result, backoff)); return null; } result.completeExceptionally(ex.getCause()); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index c6a376e45ee39..a869fc0cf9670 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -41,16 +41,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; @@ -79,7 +79,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> sessionListeners = new CopyOnWriteArrayList<>(); protected final String metadataStoreName; - protected final ScheduledExecutorService executor; + private final OrderedExecutor serDesExecutor; + private final ScheduledExecutorService eventExecutor; private final AsyncLoadingCache> childrenCache; private final AsyncLoadingCache existsCache; private final CopyOnWriteArrayList> metadataCaches = new CopyOnWriteArrayList<>(); @@ -96,13 +97,18 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected MetadataNodeSizeStats nodeSizeStats; - protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry, - MetadataNodeSizeStats nodeSizeStats) { + protected AbstractMetadataStore( + String metadataStoreName, OpenTelemetry openTelemetry, MetadataNodeSizeStats nodeSizeStats, + int numSerDesThreads) { this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() : nodeSizeStats; - this.executor = new ScheduledThreadPoolExecutor(1, - new DefaultThreadFactory( - StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); + final var namePrefix = StringUtils.isBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); + this.eventExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory(namePrefix + "-event")); + this.serDesExecutor = OrderedExecutor.newBuilder() + .numThreads(numSerDesThreads) + .name(namePrefix + "-serde") + .build(); registerListener(this); long childrenCacheMaxSizeBytes = getChildrenCacheMaxSizeBytes(); @@ -291,7 +297,7 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -300,7 +306,7 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig public MetadataCache getMetadataCache(TypeReference typeRef, MetadataCacheConfig cacheConfig) { String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -310,7 +316,7 @@ public MetadataCache getMetadataCache(String cacheName, MetadataSerde MetadataCacheConfig cacheConfig) { MetadataCacheImpl metadataCache = new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, - this.executor); + this.serDesExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -390,7 +396,7 @@ protected CompletableFuture receivedNotification(Notification notification }); return null; - }, executor); + }, eventExecutor); } catch (RejectedExecutionException e) { return FutureUtil.failedFuture(e); } @@ -573,7 +579,7 @@ protected void receivedSessionEvent(SessionEvent event) { // Notice listeners. try { - executor.execute(() -> { + eventExecutor.execute(() -> { sessionListeners.forEach(l -> { try { l.accept(event); @@ -598,8 +604,9 @@ protected static CompletableFuture alreadyClosedFailedFuture() { @Override public void close() throws Exception { - executor.shutdownNow(); - executor.awaitTermination(10, TimeUnit.SECONDS); + serDesExecutor.shutdown(); + eventExecutor.shutdownNow(); + eventExecutor.awaitTermination(10, TimeUnit.SECONDS); this.metadataStoreStats.close(); } @@ -616,30 +623,30 @@ public void invalidateCaches(String...paths) { } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, CompletableFuture future) { + protected final void processEvent(Consumer eventProcessor, T event) { try { - executor.execute(task); - } catch (Throwable t) { - future.completeExceptionally(t); + eventExecutor.execute(() -> eventProcessor.accept(event)); + } catch (RejectedExecutionException e) { + log.warn("Rejected processing event {}", event); } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, Supplier>> futures) { + protected final void scheduleDelayedTask(long delay, TimeUnit unit, Runnable task) { + CompletableFuture.delayedExecutor(delay, unit).execute(task); + } + + protected final void safeExecuteCallback(Runnable task, Consumer exceptionHandler) { try { - executor.execute(task); - } catch (final Throwable t) { - futures.get().forEach(f -> f.completeExceptionally(t)); + eventExecutor.execute(task); + } catch (Throwable t) { + exceptionHandler.accept(t); } } + protected final void safeExecuteCallback(Runnable task, CompletableFuture future) { + safeExecuteCallback(task, future::completeExceptionally); + } + protected static String parent(String path) { int idx = path.lastIndexOf('/'); if (idx <= 0) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java index 3937fd712dc9f..e1311fccfe034 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java @@ -188,7 +188,7 @@ public void close() throws Exception { @Override protected CompletableFuture existsFromStore(String path) { return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION) - .thenApplyAsync(gr -> gr.getCount() == 1, executor); + .thenApply(gr -> gr.getCount() == 1); } @Override @@ -204,9 +204,8 @@ protected CompletableFuture storePut(String path, byte[] data, Optional super.storePut(path + stat.getVersion(), data, optExpectedVersion, options), - executor); + .thenCompose( + stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options)); } } @@ -313,9 +312,7 @@ protected void batchOperation(List ops) { } } else { log.warn("Failed to commit: {}", cause.getMessage()); - executor.execute(() -> { - ops.forEach(o -> o.getFuture().completeExceptionally(ex)); - }); + ops.forEach(o -> o.getFuture().completeExceptionally(ex)); } return null; }); @@ -326,7 +323,7 @@ protected void batchOperation(List ops) { private void handleBatchOperationResult(TxnResponse txnResponse, List ops) { - executor.execute(() -> { + safeExecuteCallbacks(() -> { if (!txnResponse.isSucceeded()) { if (ops.size() > 1) { // Retry individually @@ -404,7 +401,7 @@ private void handleBatchOperationResult(TxnResponse txnResponse, } } } - }); + }, ops); } private synchronized CompletableFuture createLease(boolean retryOnFailure) { @@ -444,9 +441,7 @@ public void onCompleted() { if (retryOnFailure) { future.exceptionally(ex -> { log.warn("Failed to create Etcd lease. Retrying later", ex); - executor.schedule(() -> { - createLease(true); - }, 1, TimeUnit.SECONDS); + scheduleDelayedTask(1, TimeUnit.SECONDS, () -> createLease(true)); return null; }); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 079cb3130e054..627304b2edc7d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -79,7 +79,7 @@ private static class Value { public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 74bddda7454cb..08e5478ffcca1 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -210,7 +210,7 @@ static long toLong(byte[] bytes) { private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 5bf7e2272f022..f56d6c6941f1e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -119,7 +119,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf private void processSessionWatcher(WatchedEvent event) { if (sessionWatcher != null) { - executor.execute(() -> sessionWatcher.process(event)); + processEvent(sessionWatcher::process, event); } } @@ -245,9 +245,8 @@ protected void batchOperation(List ops) { countsByType, totalSize, opsForLog); // Retry with the individual operations - executor.schedule(() -> { - ops.forEach(o -> batchOperation(Collections.singletonList(o))); - }, 100, TimeUnit.MILLISECONDS); + scheduleDelayedTask(100, TimeUnit.MILLISECONDS, + () -> ops.forEach(o -> batchOperation(Collections.singletonList(o)))); } else { MetadataStoreException e = getException(code, path); ops.forEach(o -> o.getFuture().completeExceptionally(e)); @@ -256,7 +255,7 @@ protected void batchOperation(List ops) { } // Trigger all the futures in the batch - execute(() -> { + safeExecuteCallbacks(() -> { for (int i = 0; i < ops.size(); i++) { OpResult opr = results.get(i); MetadataOp op = ops.get(i); @@ -278,7 +277,7 @@ protected void batchOperation(List ops) { "Operation type not supported in multi: " + op.getType())); } } - }, () -> ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList())); + }, ops); }, null); } catch (Throwable t) { ops.forEach(o -> o.getFuture().completeExceptionally(new MetadataStoreException(t))); @@ -395,7 +394,7 @@ public CompletableFuture existsFromStore(String path) { try { zkc.exists(path, null, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(true); @@ -421,7 +420,7 @@ private void internalStoreDelete(OpDelete op) { try { zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(null); @@ -446,7 +445,7 @@ private void internalStorePut(OpPut opPut) { CreateMode createMode = getCreateMode(opPut.getOptions()); asyncCreateFullPathOptimistic(zkc, opPut.getPath(), opPut.getData(), createMode, (rc, path1, ctx, name) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true)); @@ -460,7 +459,7 @@ private void internalStorePut(OpPut opPut) { }); } else { zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(getStat(path1, stat)); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index a9319a50fec5c..f9ebbc8993463 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -18,16 +18,19 @@ */ package org.apache.pulsar.metadata.impl.batching; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -38,6 +41,7 @@ import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; +import org.jspecify.annotations.Nullable; @Slf4j public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore { @@ -46,8 +50,6 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final MessagePassingQueue readOps; private final MessagePassingQueue writeOps; - private final AtomicBoolean flushInProgress = new AtomicBoolean(false); - private final boolean enabled; private final int maxDelayMillis; protected final int maxOperations; @@ -55,9 +57,12 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private MetadataEventSynchronizer synchronizer; private final BatchMetadataStoreStats batchMetadataStoreStats; protected MetadataStoreBatchStrategy metadataStoreBatchStrategy; + @Nullable + private final ScheduledExecutorService flushExecutor; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats()); + super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats(), + conf.getNumSerDesThreads()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); @@ -67,18 +72,22 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { if (enabled) { readOps = new MpscUnboundedArrayQueue<>(10_000); writeOps = new MpscUnboundedArrayQueue<>(10_000); - scheduledTask = - executor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, TimeUnit.MILLISECONDS); + final var name = StringUtils.isBlank(conf.getMetadataStoreName()) ? conf.getMetadataStoreName() + : getClass().getSimpleName(); + flushExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory( + name + "-batch-flusher")); + scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, + TimeUnit.MILLISECONDS); } else { scheduledTask = null; readOps = null; writeOps = null; + flushExecutor = null; } // update synchronizer and register sync listener updateMetadataEventSynchronizer(conf.getSynchronizer()); - this.batchMetadataStoreStats = - new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry()); + this.batchMetadataStoreStats = new BatchMetadataStoreStats(metadataStoreName); this.metadataStoreBatchStrategy = new DefaultMetadataStoreBatchStrategy(maxOperations, maxSize); } @@ -96,12 +105,13 @@ public void close() throws Exception { op.getFuture().completeExceptionally(ex); } scheduledTask.cancel(true); + flushExecutor.shutdown(); } super.close(); this.batchMetadataStoreStats.close(); } - private void flush() { + private synchronized void flush() { List currentBatch; if (!readOps.isEmpty()) { while (CollectionUtils.isNotEmpty(currentBatch = metadataStoreBatchStrategy.nextBatch(readOps))) { @@ -113,8 +123,6 @@ private void flush() { internalBatchOperation(currentBatch); } } - - flushInProgress.set(false); } @Override @@ -169,8 +177,8 @@ private void enqueue(MessagePassingQueue queue, MetadataOp op) { internalBatchOperation(Collections.singletonList(op)); return; } - if (queue.size() > maxOperations && flushInProgress.compareAndSet(false, true)) { - executor.execute(this::flush); + if (queue.size() > maxOperations) { + flush(); } } else { internalBatchOperation(Collections.singletonList(op)); @@ -194,4 +202,8 @@ private void internalBatchOperation(List ops) { } protected abstract void batchOperation(List ops); + + protected final void safeExecuteCallbacks(Runnable runnable, List ops) { + safeExecuteCallback(runnable, t -> ops.forEach(op -> op.getFuture().completeExceptionally(t))); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index d055dd7da55fb..407a927bda4dc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private Optional synchronizer; public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { - super("oxia-metadata", OpenTelemetry.noop(), null); + super("oxia-metadata", OpenTelemetry.noop(), null, 1); this.client = oxia; this.identity = identity; this.synchronizer = Optional.empty(); @@ -75,7 +75,7 @@ public OxiaMetadataStore( boolean enableSessionWatcher) throws Exception { super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); if (!metadataStoreConfig.isBatchingEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java index 9549a8df8f9f1..82cc15d8aafab 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java @@ -18,23 +18,13 @@ */ package org.apache.pulsar.metadata.impl.stats; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; -import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; public final class BatchMetadataStoreStats implements AutoCloseable { private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000}; private static final String NAME = "name"; - private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge - .build("pulsar_batch_metadata_store_executor_queue_size", "-") - .labelNames(NAME) - .register(); private static final Histogram OPS_WAITING = Histogram .build("pulsar_batch_metadata_store_queue_wait_time", "-") .unit("ms") @@ -54,46 +44,17 @@ public final class BatchMetadataStoreStats implements AutoCloseable { .register(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final ThreadPoolExecutor executor; private final String metadataStoreName; private final Histogram.Child batchOpsWaitingChild; private final Histogram.Child batchExecuteTimeChild; private final Histogram.Child opsPerBatchChild; - public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size"; - private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter; - - public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) { - if (executor instanceof ThreadPoolExecutor tx) { - this.executor = tx; - } else { - this.executor = null; - } + public BatchMetadataStoreStats(String metadataStoreName) { this.metadataStoreName = metadataStoreName; - - EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() { - @Override - public double get() { - return getQueueSize(); - } - }, metadataStoreName); - this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName); this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName); this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName); - - var meter = openTelemetry.getMeter("org.apache.pulsar"); - var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName); - this.batchMetadataStoreSizeCounter = meter - .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME) - .setDescription("The number of batch operations in the metadata store executor queue") - .setUnit("{operation}") - .buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes)); - } - - private int getQueueSize() { - return executor == null ? 0 : executor.getQueue().size(); } public void recordOpWaiting(long millis) { @@ -111,11 +72,9 @@ public void recordOpsInBatch(int ops) { @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { - EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName); OPS_WAITING.remove(this.metadataStoreName); BATCH_EXECUTE_TIME.remove(this.metadataStoreName); OPS_PER_BATCH.remove(metadataStoreName); - batchMetadataStoreSizeCounter.close(); } } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index d42b2228346b8..0ae0b022a352d 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -96,7 +96,7 @@ public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStor public static class MyMetadataStore extends AbstractMetadataStore { protected MyMetadataStore() { - super("custom", OpenTelemetry.noop(), null); + super("custom", OpenTelemetry.noop(), null, 1); } @Override From 087b6a5c6d0f0f6497fcd3788c413f77e9f114c5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Sun, 1 Feb 2026 23:10:17 +0800 Subject: [PATCH 02/15] add a dedicated scheduler --- pip/pip-453.md | 22 ++----------------- .../cache/impl/MetadataCacheImpl.java | 18 +++++++++------ .../metadata/impl/AbstractMetadataStore.java | 19 +++++++++++----- 3 files changed, 26 insertions(+), 33 deletions(-) diff --git a/pip/pip-453.md b/pip/pip-453.md index a42736b9dda71..f05e0d5d9fad1 100644 --- a/pip/pip-453.md +++ b/pip/pip-453.md @@ -40,8 +40,9 @@ Additionally, some code paths execute the compute intensive tasks in the metadat # High Level Design -Create 3 set of threads: +Create 4 sets of threads: - `-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore. +- `-scheduler`: a single thread, which is used to schedule tasks like flushing and retrying failed operations. - `-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false. - `-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path. @@ -53,25 +54,6 @@ The only concern is that introducing a new thread to execute callbacks allows wa metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());; ``` -Other tasks like the retry on failure is executed in JVM's common `ForkJoinPool` by `CompletableFuture` APIs. For example: - -```diff ---- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java -+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java -@@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore - countsByType, totalSize, opsForLog); - - // Retry with the individual operations -- executor.schedule(() -> { -- ops.forEach(o -> batchOperation(Collections.singletonList(o))); -- }, 100, TimeUnit.MILLISECONDS); -+ CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() -> -+ ops.forEach(o -> batchOperation(Collections.singletonList(o)))); - } else { - MetadataStoreException e = getException(code, path); - ops.forEach(o -> o.getFuture().completeExceptionally(e)); -``` - # Detailed Design ## Public-facing Changes diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 50d802850941d..bce9a69e62302 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -63,22 +64,25 @@ public class MetadataCacheImpl implements MetadataCache, Consumer serde; private final OrderedExecutor executor; + private final ScheduledExecutorService schedulerExecutor; private final MetadataCacheConfig cacheConfig; private final AsyncLoadingCache>> objCache; public MetadataCacheImpl(String cacheName, MetadataStore store, TypeReference typeRef, - MetadataCacheConfig cacheConfig, OrderedExecutor executor) { - this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); + MetadataCacheConfig cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType type, MetadataCacheConfig cacheConfig, - OrderedExecutor executor) { - this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); + OrderedExecutor executor, ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde serde, - MetadataCacheConfig cacheConfig, OrderedExecutor executor) { + MetadataCacheConfig cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -88,6 +92,7 @@ public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde this.serde = serde; this.cacheConfig = cacheConfig; this.executor = executor; + this.schedulerExecutor = schedulerExecutor; Caffeine cacheBuilder = Caffeine.newBuilder(); if (cacheConfig.getRefreshAfterWriteMillis() > 0) { @@ -348,8 +353,7 @@ private void execute(Supplier> op, String key, CompletableF final var next = backoff.next(); log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key, next, backoff.isMandatoryStopMade(), elapsed); - CompletableFuture.delayedExecutor(next, TimeUnit.MILLISECONDS).execute(() -> - execute(op, key, result, backoff)); + schedulerExecutor.schedule(() -> execute(op, key, result, backoff), next, TimeUnit.MILLISECONDS); return null; } result.completeExceptionally(ex.getCause()); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index a869fc0cf9670..053a2540edd55 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -80,7 +81,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final CopyOnWriteArrayList> sessionListeners = new CopyOnWriteArrayList<>(); protected final String metadataStoreName; private final OrderedExecutor serDesExecutor; - private final ScheduledExecutorService eventExecutor; + private final ExecutorService eventExecutor; + private final ScheduledExecutorService schedulerExecutor; private final AsyncLoadingCache> childrenCache; private final AsyncLoadingCache existsCache; private final CopyOnWriteArrayList> metadataCaches = new CopyOnWriteArrayList<>(); @@ -103,8 +105,10 @@ protected AbstractMetadataStore( this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() : nodeSizeStats; final var namePrefix = StringUtils.isBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); - this.eventExecutor = Executors.newSingleThreadScheduledExecutor( + this.eventExecutor = Executors.newSingleThreadExecutor( new DefaultThreadFactory(namePrefix + "-event")); + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory(namePrefix + "-scheduler")); this.serDesExecutor = OrderedExecutor.newBuilder() .numThreads(numSerDesThreads) .name(namePrefix + "-serde") @@ -297,7 +301,8 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -306,7 +311,8 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig public MetadataCache getMetadataCache(TypeReference typeRef, MetadataCacheConfig cacheConfig) { String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -316,7 +322,7 @@ public MetadataCache getMetadataCache(String cacheName, MetadataSerde MetadataCacheConfig cacheConfig) { MetadataCacheImpl metadataCache = new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, - this.serDesExecutor); + this.serDesExecutor, this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -605,6 +611,7 @@ protected static CompletableFuture alreadyClosedFailedFuture() { @Override public void close() throws Exception { serDesExecutor.shutdown(); + schedulerExecutor.shutdown(); eventExecutor.shutdownNow(); eventExecutor.awaitTermination(10, TimeUnit.SECONDS); this.metadataStoreStats.close(); @@ -632,7 +639,7 @@ protected final void processEvent(Consumer eventProcessor, T event) { } protected final void scheduleDelayedTask(long delay, TimeUnit unit, Runnable task) { - CompletableFuture.delayedExecutor(delay, unit).execute(task); + schedulerExecutor.schedule(task, delay, unit); } protected final void safeExecuteCallback(Runnable task, Consumer exceptionHandler) { From 8e4cbe12e0c0b7519deb60b39606265d5a308fba Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 2 Feb 2026 11:15:08 +0800 Subject: [PATCH 03/15] address comments --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- .../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 6 +++--- .../impl/batching/AbstractBatchedMetadataStore.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 555b525e8952d..8e7aa6301d976 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -494,7 +494,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_SERVER, - doc = "The number of threads uses for serializing and deserializing data to and from the metadata store" + doc = "The number of threads used for serializing and deserializing data to and from the metadata store" ) private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 053a2540edd55..089f5de69d565 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -104,7 +104,8 @@ protected AbstractMetadataStore( int numSerDesThreads) { this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() : nodeSizeStats; - final var namePrefix = StringUtils.isBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); + final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName + : getClass().getSimpleName(); this.eventExecutor = Executors.newSingleThreadExecutor( new DefaultThreadFactory(namePrefix + "-event")); this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor( @@ -612,8 +613,7 @@ protected static CompletableFuture alreadyClosedFailedFuture() { public void close() throws Exception { serDesExecutor.shutdown(); schedulerExecutor.shutdown(); - eventExecutor.shutdownNow(); - eventExecutor.awaitTermination(10, TimeUnit.SECONDS); + eventExecutor.shutdown(); this.metadataStoreStats.close(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index f9ebbc8993463..c98940313b43f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -72,7 +72,7 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { if (enabled) { readOps = new MpscUnboundedArrayQueue<>(10_000); writeOps = new MpscUnboundedArrayQueue<>(10_000); - final var name = StringUtils.isBlank(conf.getMetadataStoreName()) ? conf.getMetadataStoreName() + final var name = StringUtils.isNotBlank(conf.getMetadataStoreName()) ? conf.getMetadataStoreName() : getClass().getSimpleName(); flushExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory( name + "-batch-flusher")); From f238fd6978bdc0ab8abb556d1a3566bf55d463bf Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 10:03:10 +0800 Subject: [PATCH 04/15] fix MetadataCacheTest.testCloneInReadModifyUpdateOrCreate --- .../cache/impl/MetadataCacheImpl.java | 77 ++++++++++++++----- .../src/test/resources/log4j2-test.xml | 53 +++++++++++++ 2 files changed, 110 insertions(+), 20 deletions(-) create mode 100644 pulsar-metadata/src/test/resources/log4j2-test.xml diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index bce9a69e62302..ae5137187436b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -43,7 +43,6 @@ import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; -import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataSerde; @@ -106,6 +105,9 @@ public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde .buildAsync(new AsyncCacheLoader>>() { @Override public CompletableFuture>> asyncLoad(String key, Executor executor) { + if (log.isDebugEnabled()) { + log.debug("Loading key {} into metadata cache {}", key, cacheName); + } return readValueFromStore(key); } @@ -115,12 +117,16 @@ public CompletableFuture>> asyncReload( Optional> oldValue, Executor executor) { if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) { - return readValueFromStore(key).thenApply(val -> { + if (log.isDebugEnabled()) { + log.debug("Reloading key {} into metadata cache {}", key, cacheName); + } + final var future = readValueFromStore(key); + future.thenAccept(val -> { if (cacheConfig.getAsyncReloadConsumer() != null) { cacheConfig.getAsyncReloadConsumer().accept(key, val); } - return val; }); + return future; } else { // Do not try to refresh the cache item if we know that we're not connected to the // metadata store @@ -133,22 +139,45 @@ public CompletableFuture>> asyncReload( } private CompletableFuture>> readValueFromStore(String path) { - return store.get(path) - .thenComposeAsync(optRes -> { - if (!optRes.isPresent()) { - return FutureUtils.value(Optional.empty()); - } - - try { - GetResult res = optRes.get(); - T obj = serde.deserialize(path, res.getValue(), res.getStat()); - return FutureUtils - .value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); - } catch (Throwable t) { - return FutureUtils.exception(new ContentDeserializationException( - "Failed to deserialize payload for key '" + path + "'", t)); - } - }, executor.chooseThread(path)); + final var future = new CompletableFuture>>(); + store.get(path).thenComposeAsync(optRes -> { + // There could be multiple pending reads for the same path, for example, when a path is created, + // 1. The `accept` method will call `refresh` + // 2. The `put` method will call `refresh` after the metadata store put operation is done + // Both will call this method and the same result will be read. In this case, we only need to deserialize + // the value once. + if (!optRes.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Key {} not found in metadata store", path); + } + return FutureUtils.value(Optional.>empty()); + } + final var res = optRes.get(); + final var cachedFuture = objCache.getIfPresent(path); + if (cachedFuture != null && cachedFuture != future) { + if (log.isDebugEnabled()) { + log.debug("A new read on key {} is in progress or completed, ignore this one", path); + } + return cachedFuture; + } + try { + T obj = serde.deserialize(path, res.getValue(), res.getStat()); + if (log.isDebugEnabled()) { + log.debug("Deserialized value for key {} (version: {})", path, res.getStat().getVersion()); + } + return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); + } catch (Throwable t) { + return FutureUtils.exception(new ContentDeserializationException( + "Failed to deserialize payload for key '" + path + "'", t)); + } + }, executor.chooseThread(path)).whenComplete((result, e) -> { + if (e != null) { + future.completeExceptionally(e.getCause()); + } else { + future.complete(result); + } + }); + return future; } @Override @@ -282,7 +311,12 @@ public CompletableFuture put(String path, T value, EnumSet o } else { return store.put(path, bytes, Optional.empty()); } - }).thenAccept(__ -> refresh(path)); + }).thenAccept(__ -> { + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} after put operation", path); + } + refresh(path); + }); } @Override @@ -322,6 +356,9 @@ public void accept(Notification t) { switch (t.getType()) { case Created: case Modified: + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} for {} notification", path, t.getType()); + } refresh(path); break; diff --git a/pulsar-metadata/src/test/resources/log4j2-test.xml b/pulsar-metadata/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000000000..16d8127e9a1f5 --- /dev/null +++ b/pulsar-metadata/src/test/resources/log4j2-test.xml @@ -0,0 +1,53 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + From cce168536d1a0ec212a2f7c2a3865db9efb0bf4a Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 11:28:23 +0800 Subject: [PATCH 05/15] fix testMetadataSerDesThreads in CI env --- .../test/java/org/apache/pulsar/broker/PulsarServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 8c25f01c3cd68..d8e8ad318d8ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -358,7 +358,7 @@ public void testMetadataSerDesThreads() throws Exception { BiConsumer verifier = (store, prefix) -> { final var serDes = new CustomMetadataSerDes(); final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build()); - for (int i = 0; i < 1000 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { + for (int i = 0; i < 10000 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { cache.create(prefix + i, "value-" + i).join(); final var value = cache.get(prefix + i).join(); assertEquals(value.orElseThrow(), "value-" + i); From af57f5480d62dcea1734f1a50137b30a089ab101 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 11:31:46 +0800 Subject: [PATCH 06/15] change the default value of metadataStoreSerDesThreads --- pip/pip-453.md | 4 +++- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pip/pip-453.md b/pip/pip-453.md index f05e0d5d9fad1..f9109798ba778 100644 --- a/pip/pip-453.md +++ b/pip/pip-453.md @@ -67,9 +67,11 @@ Add a configurations to specify the number of worker threads for `MetadataCache` category = CATEGORY_SERVER, doc = "The number of threads uses for serializing and deserializing data to and from the metadata store" ) - private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors(); + private int metadataStoreSerDesThreads = 1; ``` +Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread. + ### Metrics The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `-batch-flusher` thread won't execute other tasks except for flushing. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8e7aa6301d976..8685b53c8c9ff 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -496,7 +496,7 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece category = CATEGORY_SERVER, doc = "The number of threads used for serializing and deserializing data to and from the metadata store" ) - private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors(); + private int metadataStoreSerDesThreads = 1; @Deprecated @FieldContext( From 0f8d72d86d643df5087979577651a26afa4a6d28 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 11:33:49 +0800 Subject: [PATCH 07/15] shut down executors gracefully --- .../apache/pulsar/metadata/impl/AbstractMetadataStore.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 089f5de69d565..4fb2f5bf2b669 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -28,6 +28,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.buffer.ByteBufUtil; import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.OpenTelemetry; @@ -611,9 +612,9 @@ protected static CompletableFuture alreadyClosedFailedFuture() { @Override public void close() throws Exception { - serDesExecutor.shutdown(); - schedulerExecutor.shutdown(); - eventExecutor.shutdown(); + MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, TimeUnit.SECONDS); this.metadataStoreStats.close(); } From 2d1558102f314af53abde4097df9c1a9362f56a5 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 17:32:46 +0800 Subject: [PATCH 08/15] fix testGetExcludedBookiesWithIsolationGroups --- ...atedBookieEnsemblePlacementPolicyTest.java | 21 +++++++++++++------ .../cache/impl/MetadataCacheImpl.java | 3 ++- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 68f92ab416dc2..bcf883c550bc6 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -789,8 +790,12 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); groups.setRight(Sets.newHashSet("")); - blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertEquals(blacklist.size(), 3); + // The cache of `isolationPolicy` may not be updated immediately, so we use Awaitility to wait for the result + // after the modification on `store`. + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertEquals(newBlackList.size(), 3); + }); /* Test a bookie belongs to multiple isolation groups and totalAvailableBookiesInPrimaryGroup < ensembleSize */ groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); @@ -813,8 +818,10 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); - blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertEquals(blacklist.size(), 2); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertEquals(newBlackList.size(), 2); + }); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); @@ -836,7 +843,9 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); - blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertTrue(blacklist.isEmpty()); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertTrue(newBlackList.isEmpty()); + }); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index ae5137187436b..1010427b407ae 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -163,7 +163,8 @@ private CompletableFuture>> readValueFromStore(String try { T obj = serde.deserialize(path, res.getValue(), res.getStat()); if (log.isDebugEnabled()) { - log.debug("Deserialized value for key {} (version: {})", path, res.getStat().getVersion()); + log.debug("Deserialized value for key {} (version: {}): {}", path, res.getStat().getVersion(), + obj); } return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); } catch (Throwable t) { From 4ac992d7644217aa03bdb80244fc9fb4e6ba7ce1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 20:06:58 +0800 Subject: [PATCH 09/15] fix testMetadataSerDesThreads --- .../org/apache/pulsar/broker/PulsarServiceTest.java | 10 ++++++++-- .../pulsar/metadata/api/MetadataStoreConfig.java | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index d8e8ad318d8ff..6c492b5d98448 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -352,8 +352,14 @@ public void testShutdownViaAdminApi() throws Exception { @Test public void testMetadataSerDesThreads() throws Exception { final var numSerDesThreads = 5; - conf.setMetadataStoreSerDesThreads(numSerDesThreads); - setup(); + final var config = new ServiceConfiguration(); + config.setMetadataStoreSerDesThreads(numSerDesThreads); + config.setClusterName("test"); + config.setMetadataStoreUrl("memory:local"); + config.setConfigurationMetadataStoreUrl("memory:local"); + + @Cleanup final var pulsar = new PulsarService(config); + pulsar.start(); BiConsumer verifier = (store, prefix) -> { final var serDes = new CustomMetadataSerDes(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index 66f1f046059d8..fcde0dce8404b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -106,5 +106,5 @@ public class MetadataStoreConfig { private MetadataNodeSizeStats nodeSizeStats; @Builder.Default - private final int numSerDesThreads = Runtime.getRuntime().availableProcessors(); + private final int numSerDesThreads = 1; } From 1914d572c9c6e2966f85784b86e23077d768797b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 21:40:22 +0800 Subject: [PATCH 10/15] fix ReplicatorTest.createPartitionedTopicTest --- .../cache/impl/MetadataCacheImpl.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 1010427b407ae..ca165f0464e44 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -285,23 +285,24 @@ private CompletableFuture serialize(String path, T value) { @Override public CompletableFuture create(String path, T value) { - return serialize(path, value).thenCompose(content -> store.put(path, content, Optional.of(-1L))) - .thenApply(stat -> { - // Make sure we have the value cached before the operation is completed - // In addition to caching the value, we need to add a watch on the path, - // so when/if it changes on any other node, we are notified and we can - // update the cache - return objCache.get(path); - }) - .exceptionallyCompose(ex -> { - if (ex.getCause() instanceof BadVersionException) { - // Use already exists exception to provide more self-explanatory error message - return CompletableFuture.failedFuture(new AlreadyExistsException(ex.getCause())); - } else { - return CompletableFuture.failedFuture(ex.getCause()); - } - }) - .thenApply(__ -> null); + final var future = new CompletableFuture(); + serialize(path, value).thenCompose(content -> store.put(path, content, Optional.of(-1L))) + // Make sure we have the value cached before the operation is completed + // In addition to caching the value, we need to add a watch on the path, + // so when/if it changes on any other node, we are notified and we can + // update the cache + .thenCompose(__ -> objCache.get(path)) + .whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else if (ex.getCause() instanceof BadVersionException) { + // Use already exists exception to provide more self-explanatory error message + future.completeExceptionally(new AlreadyExistsException(ex.getCause())); + } else { + future.completeExceptionally(ex.getCause()); + } + }); + return future; } @Override From e2ff1a10fea9d4ed3e070caf0ed87b285cc4df92 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 22:24:24 +0800 Subject: [PATCH 11/15] fix IsolatedBookieEnsemblePlacementPolicyTest --- ...IsolatedBookieEnsemblePlacementPolicy.java | 3 ++ ...atedBookieEnsemblePlacementPolicyTest.java | 53 ++++++++++--------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 878bbc4d654a7..4ef1c594be444 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac // the secondary group. private ImmutablePair, Set> defaultIsolationGroups; + @Getter + @VisibleForTesting private MetadataCache bookieMappingCache; private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index bcf883c550bc6..936b04386ff7b 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; @@ -289,8 +291,7 @@ public void testBasic() throws Exception { secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null).getResult(); @@ -341,8 +342,7 @@ public void testNoBookieInfo() throws Exception { + "\": {\"rack\": \"rack0\", \"hostname\": \"bookie3.example.com\"}, \"" + BOOKIE4 + "\": {\"rack\": \"rack2\", \"hostname\": \"bookie4.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(StandardCharsets.UTF_8), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, data.getBytes(StandardCharsets.UTF_8)); List ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -400,8 +400,7 @@ public void testBookieInfoChange() throws Exception { bookieMapping.put("group1", mainBookieGroup); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -785,17 +784,12 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { bookieMapping.put(isolationGroup2, group2); bookieMapping.put(isolationGroup3, group3); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); groups.setRight(Sets.newHashSet("")); - // The cache of `isolationPolicy` may not be updated immediately, so we use Awaitility to wait for the result - // after the modification on `store`. - Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { - final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertEquals(newBlackList.size(), 3); - }); + blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertEquals(blacklist.size(), 3); /* Test a bookie belongs to multiple isolation groups and totalAvailableBookiesInPrimaryGroup < ensembleSize */ groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); @@ -813,15 +807,12 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); - Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { - final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertEquals(newBlackList.size(), 2); - }); + blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertEquals(blacklist.size(), 2); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); @@ -838,14 +829,24 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); - Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { - final var newBlackList = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); - assertTrue(newBlackList.isEmpty()); - }); + blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); + assertTrue(blacklist.isEmpty()); + } + + // The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into + // the metadata store, the cache needs some time to receive the notification and update accordingly. + private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) { + final var cache = isolationPolicy.getBookieMappingCache(); + assertNotNull(cache); // the policy must have been initialized + + final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH; + final var previousBookieInfo = cache.getIfCached(key); + store.put(key, bookieInfo, Optional.empty()).join(); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> + assertNotEquals(cache.getIfCached(key), previousBookieInfo)); } } From 1ef044e7f78554947011e4cc9d92f63819b6bdf0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 22:37:16 +0800 Subject: [PATCH 12/15] respect the proposal --- .../org/apache/pulsar/metadata/impl/AbstractMetadataStore.java | 2 +- .../metadata/impl/batching/AbstractBatchedMetadataStore.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 4fb2f5bf2b669..77b990c2be1ae 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -113,7 +113,7 @@ protected AbstractMetadataStore( new DefaultThreadFactory(namePrefix + "-scheduler")); this.serDesExecutor = OrderedExecutor.newBuilder() .numThreads(numSerDesThreads) - .name(namePrefix + "-serde") + .name(namePrefix + "-worker") .build(); registerListener(this); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index c98940313b43f..30989a41bd167 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.metadata.impl.batching; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Collections; import java.util.EnumSet; @@ -105,7 +106,7 @@ public void close() throws Exception { op.getFuture().completeExceptionally(ex); } scheduledTask.cancel(true); - flushExecutor.shutdown(); + MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, TimeUnit.SECONDS); } super.close(); this.batchMetadataStoreStats.close(); From 4752879d54aec5af786e5d4770bd7621ffa5bf6d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 22:38:01 +0800 Subject: [PATCH 13/15] remove the log4j config for tests --- .../src/test/resources/log4j2-test.xml | 53 ------------------- 1 file changed, 53 deletions(-) delete mode 100644 pulsar-metadata/src/test/resources/log4j2-test.xml diff --git a/pulsar-metadata/src/test/resources/log4j2-test.xml b/pulsar-metadata/src/test/resources/log4j2-test.xml deleted file mode 100644 index 16d8127e9a1f5..0000000000000 --- a/pulsar-metadata/src/test/resources/log4j2-test.xml +++ /dev/null @@ -1,53 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - From f482d9e69d03a5f7f789718bde32ea381033dc33 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Feb 2026 22:40:19 +0800 Subject: [PATCH 14/15] improve tests --- .../test/java/org/apache/pulsar/broker/PulsarServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 6c492b5d98448..6195e9cdae593 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -364,7 +364,7 @@ public void testMetadataSerDesThreads() throws Exception { BiConsumer verifier = (store, prefix) -> { final var serDes = new CustomMetadataSerDes(); final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build()); - for (int i = 0; i < 10000 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { + for (int i = 0; i < 100 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { cache.create(prefix + i, "value-" + i).join(); final var value = cache.get(prefix + i).join(); assertEquals(value.orElseThrow(), "value-" + i); From 87c850046eb419885fd6d8a750c0f4ba383ac2be Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Feb 2026 18:02:09 +0800 Subject: [PATCH 15/15] update broker and standalone configs --- conf/broker.conf | 2 ++ conf/standalone.conf | 3 +++ 2 files changed, 5 insertions(+) diff --git a/conf/broker.conf b/conf/broker.conf index e4923172d131c..261018ed683c0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -954,6 +954,8 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 ### --- Authentication --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index 7f640a94f3d4b..b63caa1d9ec8a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 + ### --- TLS --- ### # Deprecated - Use webServicePortTls and brokerServicePortTls instead tlsEnabled=false