diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4a22f8b33888a..a32d8b18f1dc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -83,7 +83,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.StringUtils; @@ -1105,18 +1104,19 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { + if (isBrokerEntryMetadataEnabled()) { // init managedLedger interceptor + Set interceptors = new HashSet<>(); for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + // add individual AppendOffsetMetadataInterceptor for each topic if (interceptor instanceof AppendIndexMetadataInterceptor) { - // add individual AppendOffsetMetadataInterceptor for each topic - brokerEntryMetadataInterceptors.remove(interceptor); - brokerEntryMetadataInterceptors.add(new AppendIndexMetadataInterceptor()); + interceptors.add(new AppendIndexMetadataInterceptor()); + } else { + interceptors.add(interceptor); } } - ManagedLedgerInterceptor mlInterceptor = - new ManagedLedgerInterceptorImpl(brokerEntryMetadataInterceptors); - managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor); + managedLedgerConfig.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(interceptors)); } managedLedgerConfig.setCreateIfMissing(createIfMissing);