Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,12 @@ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName,
}

@Override
protected void handleLookup(CommandLookupTopic lookup) {
protected void handleLookup(CommandLookupTopic lookupParam) {
checkArgument(state == State.Connected);

// Make a copy since the command is handled asynchronously
CommandLookupTopic lookup = new CommandLookupTopic().copyFrom(lookupParam);

final long requestId = lookup.getRequestId();
final boolean authoritative = lookup.isAuthoritative();

Expand Down Expand Up @@ -662,8 +666,13 @@ private void writeAndFlush(ByteBuf cmd) {
}

@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadataParam) {
checkArgument(state == State.Connected);

// Make a copy since the command is handled asynchronously
CommandPartitionedTopicMetadata partitionMetadata =
new CommandPartitionedTopicMetadata().copyFrom(partitionMetadataParam);

final long requestId = partitionMetadata.getRequestId();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(),
Expand Down Expand Up @@ -3181,8 +3190,12 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
}

@Override
protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicListParam) {
checkArgument(state == State.Connected);

// make a copy since command is handled asynchronously
CommandWatchTopicList commandWatchTopicList = new CommandWatchTopicList().copyFrom(commandWatchTopicListParam);

final long requestId = commandWatchTopicList.getRequestId();
final long watcherId = commandWatchTopicList.getWatcherId();
final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c
CompletableFuture<CommandWatchTopicListSuccess> requestFuture =
(CompletableFuture<CommandWatchTopicListSuccess>) pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.complete(commandWatchTopicListSuccess);
requestFuture.complete(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess));
} else {
duplicatedResponseCounter.incrementAndGet();
log.warn("{} Received unknown request id from server: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -334,7 +335,9 @@ public void testCreateWatcher() {
.setRequestId(7)
.setWatcherId(5).setTopicsHash("f00");
cnx.handleCommandWatchTopicListSuccess(success);
assertEquals(result.getNow(null), success);
assertThat(result.getNow(null))
.usingRecursiveComparison()
.comparingOnlyFields("requestId", "watcherId", "topicsHash");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
} finally {
buffer.release();
// Clear the fields in cmd to release memory.
// The clear() call below also helps prevent misuse of holding references to command objects after
// handle* methods complete, as per the class javadoc requirement.
// While this doesn't completely prevent such misuse, it makes tests more likely to catch violations.
cmd.clear();
}
}

Expand Down
Loading