From d2cc70eea04dd71f2d7e52d2403df63c21d8a575 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Oct 2025 20:46:08 +0300 Subject: [PATCH 1/5] [fix][client] Fix temporary memory leak in PulsarDecoder/ClientCnx/ServerCnx --- .../java/org/apache/pulsar/common/protocol/PulsarDecoder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index b61664d9571da..1de8c9b78cc33 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -483,6 +483,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } finally { buffer.release(); + cmd.clear(); } } From 74757748d43ec6570d3a4aea7059188f22804bf6 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Oct 2025 09:48:47 +0300 Subject: [PATCH 2/5] Fix potential issue where command is handled in different thread --- .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 43be0072a8cde..9cccbecf3aa13 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1242,7 +1242,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c CompletableFuture requestFuture = (CompletableFuture) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(commandWatchTopicListSuccess); + requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess)); } else { duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", From b66dee86738021a5f9a5fbd9afd140b662f085e4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Oct 2025 09:57:10 +0300 Subject: [PATCH 3/5] Fix issues in ServerCnx --- .../pulsar/broker/service/ServerCnx.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4d21b2810cd3a..fbfb8108846b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -574,8 +574,12 @@ private CompletableFuture isTopicOperationAllowed(TopicName topicName, } @Override - protected void handleLookup(CommandLookupTopic lookup) { + protected void handleLookup(CommandLookupTopic lookupParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam); + final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); @@ -662,8 +666,13 @@ private void writeAndFlush(ByteBuf cmd) { } @Override - protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { + protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) { checkArgument(state == State.Connected); + + // Make a copy since the command is handled asynchronously + CommandPartitionedTopicMetadata partitionMetadata = + new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam); + final long requestId = partitionMetadata.getRequestId(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), @@ -3181,8 +3190,12 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) { } @Override - protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) { checkArgument(state == State.Connected); + + // make a copy since command is handled asynchronously + CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam); + final long requestId = commandWatchTopicList.getRequestId(); final long watcherId = commandWatchTopicList.getWatcherId(); final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); From adeacedd0d43511968ed8251707105f56c78edbb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Oct 2025 10:32:04 +0300 Subject: [PATCH 4/5] Add comment --- .../java/org/apache/pulsar/common/protocol/PulsarDecoder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 1de8c9b78cc33..33307a8cacc00 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -483,6 +483,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } finally { buffer.release(); + // Clear the fields in cmd to release memory. + // The clear() call below also helps prevent misuse of holding references to command objects after + // handle* methods complete, as per the class javadoc requirement. + // While this doesn't completely prevent such misuse, it makes tests more likely to catch violations. cmd.clear(); } } From 8f830e188a6b4dec13c73973c0da52b8fd9453af Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Oct 2025 10:47:40 +0300 Subject: [PATCH 5/5] Fix test --- .../java/org/apache/pulsar/client/impl/ClientCnxTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index e0721ffe90597..c0a75b09ccea1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -334,7 +335,9 @@ public void testCreateWatcher() { .setRequestId(7) .setWatcherId(5).setTopicsHash("f00"); cnx.handleCommandWatchTopicListSuccess(success); - assertEquals(result.getNow(null), success); + assertThat(result.getNow(null)) + .usingRecursiveComparison() + .comparingOnlyFields("requestId", "watcherId", "topicsHash"); }); }