Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE

@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
topic.getBrokerService().executor().execute(() -> {
topic.getBrokerService().executor().execute(safeRun(() -> {
internalConsumerFlow(consumer, additionalNumberOfMessages);
});
}));
}

private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
Expand All @@ -247,7 +247,7 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
*
*/
public void readMoreEntriesAsync() {
topic.getBrokerService().executor().execute(this::readMoreEntries);
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}

public synchronized void readMoreEntries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -330,14 +331,14 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(() -> readMoreEntries());
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
} else if (currentThreadKeyNumber == 0) {
sendInProgress = false;
topic.getBrokerService().executor().schedule(() -> {
topic.getBrokerService().executor().schedule(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
readMoreEntries();
}
}, 100, TimeUnit.MILLISECONDS);
}), 100, TimeUnit.MILLISECONDS);
}
return false;
}
Expand Down Expand Up @@ -411,7 +412,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
public void markDeletePositionMoveForward() {
// Execute the notification in different thread to avoid a mutex chain here
// from the delete operation that was completed
topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
topic.getBrokerService().getTopicOrderedExecutor().execute(safeRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
Expand All @@ -420,7 +421,7 @@ && removeConsumersFromRecentJoinedConsumers()) {
readMoreEntries();
}
}
});
}));
}

private boolean removeConsumersFromRecentJoinedConsumers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public synchronized void readMoreEntries() {
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
Expand Down