From 6b8abe4b66b43f6340d214c0f989d41f0674ce28 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 28 Apr 2021 06:55:07 +0300 Subject: [PATCH] [Broker] Make Persistent*DispatcherMultipleConsumers.readMoreEntries synchronized --- .../persistent/PersistentDispatcherMultipleConsumers.java | 2 +- .../PersistentStreamingDispatcherMultipleConsumers.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 99971ca809b6b..3440e7919ca8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -216,7 +216,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM readMoreEntries(); } - public void readMoreEntries() { + public synchronized void readMoreEntries() { // totalAvailablePermits may be updated by other threads int currentTotalAvailablePermits = totalAvailablePermits; if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 9340e17aab22c..a080e623a93cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -135,7 +135,7 @@ protected void cancelPendingRead() { } @Override - public void readMoreEntries() { + public synchronized void readMoreEntries() { // totalAvailablePermits may be updated by other threads int currentTotalAvailablePermits = totalAvailablePermits; if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {