diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a302eb8d2a50c..fd527fd5611b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -219,9 +219,9 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE @Override public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { - topic.getBrokerService().executor().execute(() -> { + topic.getBrokerService().executor().execute(safeRun(() -> { internalConsumerFlow(consumer, additionalNumberOfMessages); - }); + })); } private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) { @@ -247,7 +247,7 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional * */ public void readMoreEntriesAsync() { - topic.getBrokerService().executor().execute(this::readMoreEntries); + topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); } public synchronized void readMoreEntries() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index e42995e924769..5573596a96e28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; @@ -330,14 +331,14 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // readMoreEntries should run regardless whether or not stuck is caused by // stuckConsumers for avoid stopping dispatch. sendInProgress = false; - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); } else if (currentThreadKeyNumber == 0) { sendInProgress = false; - topic.getBrokerService().executor().schedule(() -> { + topic.getBrokerService().executor().schedule(safeRun(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { readMoreEntries(); } - }, 100, TimeUnit.MILLISECONDS); + }), 100, TimeUnit.MILLISECONDS); } return false; } @@ -411,7 +412,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List en public void markDeletePositionMoveForward() { // Execute the notification in different thread to avoid a mutex chain here // from the delete operation that was completed - topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { + topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() && removeConsumersFromRecentJoinedConsumers()) { @@ -420,7 +421,7 @@ && removeConsumersFromRecentJoinedConsumers()) { readMoreEntries(); } } - }); + })); } private boolean removeConsumersFromRecentJoinedConsumers() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 26647dfc1d50f..649d19bcec091 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -192,7 +192,7 @@ public synchronized void readMoreEntries() { havePendingReplayRead = false; // We should not call readMoreEntries() recursively in the same thread // as there is a risk of StackOverflowError - topic.getBrokerService().executor().execute(() -> readMoreEntries()); + topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,