From 7e41fbb3b88b00383358df7ca202353f42a9bc32 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Sat, 12 Mar 2022 17:23:06 +0800 Subject: [PATCH] fix percision issue for Consumer#avgMessagesPerEntry --- .../pulsar/broker/service/Consumer.java | 28 +++++++++++-------- ...PersistentDispatcherMultipleConsumers.java | 3 +- ...sistentDispatcherSingleActiveConsumer.java | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 062b778e3d6fb..0f8a6712676ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AtomicDouble; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.util.ArrayList; @@ -119,12 +120,10 @@ public class Consumer { /** * It starts keep tracking the average messages per entry. - * The initial value is 1000, when new value comes, it will update with + * The initial value is 0, when new value comes, it will update with * avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value. */ - private static final AtomicIntegerFieldUpdater AVG_MESSAGES_PER_ENTRY = - AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry"); - private volatile int avgMessagesPerEntry = 1000; + private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0); private static final long [] EMPTY_ACK_SET = new long[0]; private static final double avgPercent = 0.9; @@ -172,7 +171,6 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0); UNACKED_MESSAGES_UPDATER.set(this, 0); - AVG_MESSAGES_PER_ENTRY.set(this, 1000); this.metadata = metadata != null ? metadata : Collections.emptyMap(); @@ -288,10 +286,13 @@ public Future sendMessages(final List entries, EntryBatchSizes batc } // calculate avg message per entry - int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this); - tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent - + (1 - avgPercent) * totalMessages / entries.size()); - AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry); + if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1 + // set init value. + avgMessagesPerEntry.set(1.0 * totalMessages / entries.size()); + } else { + avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent + + (1 - avgPercent) * totalMessages / entries.size()); + } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount(); @@ -299,7 +300,7 @@ public Future sendMessages(final List entries, EntryBatchSizes batc if (log.isDebugEnabled()){ log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer" + " for consumerId: {}; avgMessagesPerEntry is {}", - topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry); + topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } incrementUnackedMessages(unackedMessages); msgOut.recordMultipleEvents(totalMessages, totalBytes); @@ -706,8 +707,11 @@ public int getAvailablePermits() { return MESSAGE_PERMITS_UPDATER.get(this); } + /** + * return 0 if there is no entry dispatched yet. + */ public int getAvgMessagesPerEntry() { - return AVG_MESSAGES_PER_ENTRY.get(this); + return (int) Math.round(avgMessagesPerEntry.get()); } public boolean isBlocked() { @@ -752,7 +756,7 @@ public void updateStats(ConsumerStatsImpl consumerStats) { } unackedMessages = consumerStats.unackedMessages; blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs; - AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry); + avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry); } public ConsumerStatsImpl getStats() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 5aa351b80c4c8..092bc798e90cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -294,8 +294,9 @@ protected Pair calculateToRead(int currentTotalAvailablePermits) Consumer c = getRandomConsumer(); // if turn on precise dispatcher flow control, adjust the record to read if (c != null && c.isPreciseDispatcherFlowControl()) { + int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry()); messagesToRead = Math.min( - (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()), + (int) Math.ceil(currentTotalAvailablePermits * 1.0 / avgMessagesPerEntry), readBatchSize); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 4932501039d70..abab5288d5a19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -400,7 +400,7 @@ protected Pair calculateToRead(Consumer consumer) { long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes(); // if turn of precise dispatcher flow control, adjust the records to read if (consumer.isPreciseDispatcherFlowControl()) { - int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry(); + int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry()); messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize); }