From 64255907ad32cf06368878c424c1057fc4401136 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 4 Jan 2021 17:30:47 +0800 Subject: [PATCH] Fix NPE when MultiTopicsConsumerImpl receives null value messages --- .../pulsar/broker/service/NullValueTest.java | 52 ++++++++++++++----- .../pulsar/client/impl/ConsumerBase.java | 11 +++- .../pulsar/client/impl/ConsumerImpl.java | 14 ++--- .../client/impl/MultiTopicsConsumerImpl.java | 16 +++--- 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java index 553fe0145f884..6473b2b34a0ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java @@ -21,17 +21,22 @@ import java.util.concurrent.CompletableFuture; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -52,13 +57,23 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test - public void nullValueBytesSchemaTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/null-value-bytes-test"; + @DataProvider(name = "topics") + public static Object[][] topics() { + return new Object[][]{ + {"persistent://prop/ns-abc/null-value-test-0", 1}, + {"persistent://prop/ns-abc/null-value-test-1", 3}, + }; + } + + @Test(dataProvider = "topics") + public void nullValueBytesSchemaTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -120,13 +135,15 @@ public void nullValueBytesSchemaTest() throws PulsarClientException { } - @Test - public void nullValueBooleanSchemaTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/null-value-bool-test"; + @Test(dataProvider = "topics") + public void nullValueBooleanSchemaTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer producer = pulsarClient.newProducer(Schema.BOOL) .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -148,14 +165,16 @@ public void nullValueBooleanSchemaTest() throws PulsarClientException { } - @Test - public void keyValueNullInlineTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/kv-null-value-test"; + @Test(dataProvider = "topics") + public void keyValueNullInlineTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer> producer = pulsarClient .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING)) .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -193,14 +212,23 @@ public void keyValueNullInlineTest() throws PulsarClientException { } - @Test - public void keyValueNullSeparatedTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/kv-null-value-test"; + @Test(dataProvider = "topics") + public void keyValueNullSeparatedTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer> producer = pulsarClient .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED)) .topic(topic) + // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is + // SEPARATED so we need to define a message router to guarantee the message order. + .messageRouter(new MessageRouter() { + @Override + public int choosePartition(Message msg, TopicMetadata metadata) { + return 0; + } + }) .create(); @Cleanup diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 755912aca76d1..b8860dd83d954 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -74,7 +74,7 @@ public abstract class ConsumerBase extends HandlerState implements Consumer interceptors; protected final BatchReceivePolicy batchReceivePolicy; protected ConcurrentLinkedQueue> pendingBatchReceives; - protected static final AtomicLongFieldUpdater INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater + private static final AtomicLongFieldUpdater INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater .newUpdater(ConsumerBase.class, "incomingMessagesSize"); protected volatile long incomingMessagesSize = 0; protected volatile Timeout batchReceiveTimeout = null; @@ -847,6 +847,15 @@ protected boolean hasPendingBatchReceive() { return pendingBatchReceives != null && peekNextBatchReceive() != null; } + protected void resetIncomingMessageSize() { + INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + } + + protected void updateIncomingMessageSize(final Message message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, + (message.getData() != null) ? message.getData().length : 0); + } + protected abstract void completeOpBatchReceive(OpBatchReceive op); private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 9efa0d9743bba..e40aafe4ee344 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -935,7 +935,7 @@ protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize private BatchMessageIdImpl clearReceiverQueue() { List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); if (duringSeek.compareAndSet(true, false)) { return seekMessageId; @@ -1522,7 +1522,7 @@ protected synchronized void messageProcessed(Message msg) { stats.updateNumMsgsReceived(msg); trackMessage(msg); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length); + updateIncomingMessageSize(msg); } protected void trackMessage(Message msg) { @@ -1732,7 +1732,7 @@ public void redeliverUnacknowledgedMessages() { synchronized (this) { currentSize = incomingMessages.size(); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); @@ -1756,7 +1756,7 @@ public void redeliverUnacknowledgedMessages() { public int clearIncomingMessagesAndGetMessageNumber() { int messagesNumber = incomingMessages.size(); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); return messagesNumber; } @@ -1910,7 +1910,7 @@ public CompletableFuture seekAsync(long timestamp) { lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); seekFuture.complete(null); }).exceptionally(e -> { log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); @@ -1971,7 +1971,7 @@ public CompletableFuture seekAsync(MessageId messageId) { lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); seekFuture.complete(null); }).exceptionally(e -> { log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); @@ -2216,7 +2216,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { // try not to remove elements that are added while we remove Message message = incomingMessages.poll(); while (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); messagesFromQueue++; MessageIdImpl id = getMessageIdImpl(message); if (!messageIds.contains(id)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 9fa397bda31c6..a9920b6e21368 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -308,7 +308,7 @@ private void messageReceived(ConsumerImpl consumer, Message message) { @Override protected synchronized void messageProcessed(Message msg) { unAckedMessageTracker.add(msg.getMessageId()); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length); + updateIncomingMessageSize(msg); } private void resumeReceivingFromPausedConsumersIfNeeded() { @@ -331,7 +331,7 @@ protected Message internalReceive() throws PulsarClientException { Message message; try { message = incomingMessages.take(); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -347,7 +347,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarCl try { message = incomingMessages.poll(timeout, unit); if (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); } @@ -388,7 +388,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message msg = incomingMessages.poll(); if (msg != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length); + updateIncomingMessageSize(msg); Message interceptMsg = beforeConsume(msg); messages.add(interceptMsg); } @@ -416,7 +416,7 @@ protected CompletableFuture> internalReceiveAsync() { pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -622,7 +622,7 @@ public void redeliverUnacknowledgedMessages() { consumer.unAckedChunkedMessageIdSequenceMap.clear(); }); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); } finally { lock.writeLock().unlock(); @@ -691,7 +691,7 @@ public CompletableFuture seekAsync(MessageId messageId) { unAckedMessageTracker.clear(); incomingMessages.clear(); - MultiTopicsConsumerImpl.INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); FutureUtil.waitForAll(futures).whenComplete((result, exception) -> { if (exception != null) { @@ -781,7 +781,7 @@ private void removeExpiredMessagesFromQueue(Set messageIds) { Message message = incomingMessages.poll(); checkState(message instanceof TopicMessageImpl); while (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); MessageId messageId = message.getMessageId(); if (!messageIds.contains(messageId)) { messageIds.add(messageId);