diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4d21b2810cd3a..fbfb8108846b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -574,8 +574,12 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, } @Override - protected void handleLookup(CommandLookupTopic lookup) { + protected void handleLookup(CommandLookupTopic lookupParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam); + final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); @@ -662,8 +666,13 @@ private void writeAndFlush(ByteBuf cmd) { } @Override - protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { + protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandPartitionedTopicMetadata partitionMetadata = + new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam); + final long requestId = partitionMetadata.getRequestId(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), @@ -3181,8 +3190,12 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { } @Override - protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) { checkArgument(state == State.Connected); + + // make a copy since command is handled asynchronously + CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam); + final long requestId = commandWatchTopicList.getRequestId(); final long watcherId = commandWatchTopicList.getWatcherId(); final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 43be0072a8cde..9cccbecf3aa13 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1242,7 +1242,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c CompletableFuture requestFuture = (CompletableFuture) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(commandWatchTopicListSuccess); + requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index e0721ffe90597..c0a75b09ccea1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -334,7 +335,9 @@ public void testCreateWatcher() { .setRequestId(7) .setWatcherId(5).setTopicsHash("f00"); cnx.handleCommandWatchTopicListSuccess(success); - assertEquals(result.getNow(null), success); + assertThat(result.getNow(null)) + .usingRecursiveComparison() + .comparingOnlyFields("requestId", "watcherId", "topicsHash"); }); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index b61664d9571da..33307a8cacc00 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -483,6 +483,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } finally { buffer.release(); + // Clear the fields in cmd to release memory. + // The clear() call below also helps prevent misuse of holding references to command objects after + // handle* methods complete, as per the class javadoc requirement. + // While this doesn't completely prevent such misuse, it makes tests more likely to catch violations. + cmd.clear(); } }