From 3c9a0c76c3773d2663b8594f61c8a813c60b8dd2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:32:52 +0000 Subject: [PATCH 1/9] Initial plan From 7105f3e3249c36fb1fdeeb9a76d16082fd95a947 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:43:23 +0000 Subject: [PATCH 2/9] Add PartitionConsumerIdleDetector and integrate into ConsumerImpl Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../pulsar/client/impl/ConsumerImpl.java | 80 +++++++++ .../impl/PartitionConsumerIdleDetector.java | 161 ++++++++++++++++++ .../impl/conf/ConsumerConfigurationData.java | 32 ++++ 3 files changed, 273 insertions(+) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 37e9f16fe0292..62c4ddc3f42c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -245,6 +246,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle // This field will be set after the state becomes Failed, then the following operations will fail immediately private volatile Throwable failReason = null; + private final PartitionConsumerIdleDetector idleDetector; + private volatile ScheduledFuture idleCheckTask; + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -442,6 +446,24 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat "The number of negatively acknowledged messages", topic, attrs); consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, "The number of messages sent to DLQ", topic, attrs); + + // Initialize idle detector if enabled + long idleTimeoutMs = conf.getConsumerIdleTimeoutMs(); + if (idleTimeoutMs > 0 && conf.isEnablePartitionOwnershipCheck()) { + this.idleDetector = new PartitionConsumerIdleDetector(this, idleTimeoutMs); + + // Schedule periodic check every 10 seconds + this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { + idleDetector.checkIdleAndReconnectIfNeeded() + .exceptionally(ex -> { + log.warn("[{}][{}] Idle check failed", topic, subscription, ex); + return null; + }); + }, idleTimeoutMs, 10_000, TimeUnit.MILLISECONDS); + } else { + this.idleDetector = null; + } + grabCnx(); consumersOpenedCounter.increment(); @@ -461,6 +483,31 @@ NegativeAcksTracker getNegativeAcksTracker() { return negativeAcksTracker; } + // Package-private methods for PartitionConsumerIdleDetector + AcknowledgmentsGroupingTracker getAcknowledgmentsGroupingTracker() { + return acknowledgmentsGroupingTracker; + } + + void incrementConsumerEpoch() { + CONSUMER_EPOCH.incrementAndGet(this); + } + + ExecutorService getInternalPinnedExecutor() { + return internalPinnedExecutor; + } + + PulsarClientImpl getClient() { + return client; + } + + TopicName getTopicName() { + return topicName; + } + + ClientCnx getClientCnx() { + return cnx(); + } + @Override public CompletableFuture unsubscribeAsync(boolean force) { if (getState() == State.Closing || getState() == State.Closed) { @@ -517,6 +564,9 @@ protected Message internalReceive() throws PulsarClientException { } message = incomingMessages.take(); messageProcessed(message); + if (idleDetector != null) { + idleDetector.markActive(); + } return beforeConsume(message); } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); @@ -537,6 +587,9 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { messageProcessed(message); + if (idleDetector != null) { + idleDetector.markActive(); + } result.complete(beforeConsume(message)); } }); @@ -556,6 +609,9 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); + if (idleDetector != null) { + idleDetector.markActive(); + } message = listener == null ? beforeConsume(message) : message; return message; } catch (InterruptedException e) { @@ -621,6 +677,10 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return FutureUtil.failedFuture(exception); } + if (idleDetector != null) { + idleDetector.markActive(); + } + if (txn != null) { return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); @@ -643,6 +703,11 @@ protected CompletableFuture doAcknowledge(List messageIdList, A } return FutureUtil.failedFuture(exception); } + + if (idleDetector != null) { + idleDetector.markActive(); + } + if (txn != null) { return doTransactionAcknowledgeForResponse(messageIdList, ackType, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); @@ -839,6 +904,10 @@ public void negativeAcknowledge(MessageId messageId) { consumerNacksCounter.increment(); negativeAcksTracker.add(messageId); + if (idleDetector != null) { + idleDetector.markActive(); + } + // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); } @@ -848,6 +917,10 @@ public void negativeAcknowledge(Message message) { consumerNacksCounter.increment(); negativeAcksTracker.add(message); + if (idleDetector != null) { + idleDetector.markActive(); + } + // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId())); } @@ -1253,6 +1326,9 @@ private void closeConsumerTasks() { } negativeAcksTracker.close(); stats.getStatTimeout().ifPresent(Timeout::cancel); + if (idleCheckTask != null) { + idleCheckTask.cancel(false); + } //terminate incomingMessages queue, stop accept the new messages and waking up blocked thread. incomingMessages.terminate(Message::release); clearIncomingMessages(); @@ -2201,6 +2277,10 @@ public void redeliverUnacknowledgedMessages() { // Second : we should synchronized `ClientCnx cnx = cnx()` to // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker synchronized (ConsumerImpl.this) { + if (idleDetector != null) { + idleDetector.markActive(); + } + ClientCnx cnx = cnx(); // V1 don't support redeliverUnacknowledgedMessages if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java new file mode 100644 index 0000000000000..ef12153ea60c0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Detects when a partition consumer becomes idle and triggers reconnection if topic ownership changed. + * + * This helps prevent permanent message loss when: + * - Topic ownership transfers to a different broker + * - The old broker fails to send CommandCloseConsumer notification + * - The consumer remains connected to a broker that no longer owns the topic + */ +@Slf4j +class PartitionConsumerIdleDetector { + private final ConsumerImpl consumer; + private final long idleTimeoutMs; + private final AtomicLong lastActivityTimestamp; + + PartitionConsumerIdleDetector(ConsumerImpl consumer, long idleTimeoutMs) { + this.consumer = consumer; + this.idleTimeoutMs = idleTimeoutMs; + this.lastActivityTimestamp = new AtomicLong(System.currentTimeMillis()); + } + + /** + * Mark the consumer as active (message received or acknowledged). + */ + void markActive() { + lastActivityTimestamp.set(System.currentTimeMillis()); + } + + /** + * Check if the consumer is idle and reconnect if topic ownership changed. + * + * @return CompletableFuture that completes when the check is done + */ + CompletableFuture checkIdleAndReconnectIfNeeded() { + long idleDuration = System.currentTimeMillis() - lastActivityTimestamp.get(); + + if (idleDuration < idleTimeoutMs) { + // Not idle yet + return CompletableFuture.completedFuture(null); + } + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Consumer idle for {}ms, verifying topic ownership", + consumer.getTopic(), consumer.getSubscription(), idleDuration); + } + + return verifyTopicOwnership() + .thenCompose(ownershipChanged -> { + if (ownershipChanged) { + log.info("[{}][{}] Topic ownership changed, reconnecting consumer", + consumer.getTopic(), consumer.getSubscription()); + return reconnectWithCleanup(); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Topic ownership unchanged, no reconnection needed", + consumer.getTopic(), consumer.getSubscription()); + } + return CompletableFuture.completedFuture(null); + } + }); + } + + /** + * Verify if topic ownership has changed by performing a lookup. + * + * @return CompletableFuture true if ownership changed, false otherwise + */ + private CompletableFuture verifyTopicOwnership() { + // Get current broker address + ClientCnx currentCnx = consumer.getClientCnx(); + if (currentCnx == null) { + // No current connection, no need to check + return CompletableFuture.completedFuture(false); + } + + InetSocketAddress currentBroker = currentCnx.ctx().channel().remoteAddress(); + + // Perform topic lookup to find the current owner + return consumer.getClient().getLookup().getBroker(consumer.getTopicName()) + .thenApply(lookupResult -> { + InetSocketAddress newBroker = lookupResult.getLogicalAddress(); + boolean changed = !currentBroker.equals(newBroker); + + if (changed) { + log.info("[{}][{}] Topic ownership changed: {} -> {}", + consumer.getTopic(), consumer.getSubscription(), + currentBroker, newBroker); + } + + return changed; + }) + .exceptionally(ex -> { + log.warn("[{}][{}] Failed to verify topic ownership", + consumer.getTopic(), consumer.getSubscription(), ex); + // On lookup failure, don't trigger reconnection + return false; + }); + } + + /** + * Reconnect the consumer with comprehensive cleanup. + * + * @return CompletableFuture that completes when reconnection is initiated + */ + private CompletableFuture reconnectWithCleanup() { + return CompletableFuture.runAsync(() -> { + try { + // 1. Clear unacked message tracker + consumer.getUnAckedMessageTracker().clear(); + + // 2. Clear pending ack queues in acknowledgment tracker + consumer.getAcknowledgmentsGroupingTracker().flush(); + + // 3. Clear batch message ack tracker (if exists) + // The batch ack tracker is internal to acknowledgment tracker + + // 4. Increment consumer epoch to reject old acks + consumer.incrementConsumerEpoch(); + + log.info("[{}][{}] Cleaned up consumer state before reconnection", + consumer.getTopic(), consumer.getSubscription()); + + } catch (Exception e) { + log.error("[{}][{}] Error during cleanup before reconnection", + consumer.getTopic(), consumer.getSubscription(), e); + } + }, consumer.getInternalPinnedExecutor()) + .thenCompose(__ -> { + // Trigger reconnection by calling reconnectLater on connection handler + consumer.getConnectionHandler().reconnectLater( + new PulsarClientException("Topic ownership changed, reconnecting")); + return CompletableFuture.completedFuture(null); + }); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 42fc2666573fe..a4f097d255b10 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -419,6 +419,38 @@ public int getMaxPendingChuckedMessage() { private List topicConfigurations = new ArrayList<>(); + /** + * Consumer idle timeout in milliseconds. + * If the consumer doesn't receive messages or perform acks for this duration, + * it will trigger a lookup to verify topic ownership and reconnect if changed. + * + * Set to 0 to disable idle detection. + * Default: 30000 (30 seconds) + */ + @ApiModelProperty( + name = "consumerIdleTimeoutMs", + value = "Consumer idle timeout in milliseconds. " + + "If the consumer doesn't receive messages or perform acks for this duration, " + + "it will trigger a lookup to verify topic ownership and reconnect if changed. " + + "Set to 0 to disable idle detection." + ) + private long consumerIdleTimeoutMs = 30_000; + + /** + * Enable partition-level topic ownership verification. + * When enabled, consumers will periodically check if topic ownership has changed + * and automatically reconnect to the new broker if needed. + * + * Default: true + */ + @ApiModelProperty( + name = "enablePartitionOwnershipCheck", + value = "Enable partition-level topic ownership verification. " + + "When enabled, consumers will periodically check if topic ownership has changed " + + "and automatically reconnect to the new broker if needed." + ) + private boolean enablePartitionOwnershipCheck = true; + public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { return topicConfigurations.stream() .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName)) From 1d34c4f826e0b36fe8b31617faa1bc01826f1b0e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:44:42 +0000 Subject: [PATCH 3/9] Add comprehensive unit tests for PartitionConsumerIdleDetector Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../PartitionConsumerIdleDetectorTest.java | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java new file mode 100644 index 0000000000000..c0b7dc40fbb0a --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class PartitionConsumerIdleDetectorTest { + private final String topic = "persistent://tenant/ns1/my-topic"; + private ExecutorProvider executorProvider; + private ExecutorService internalExecutor; + private ConsumerImpl consumer; + private ConsumerConfigurationData consumerConf; + private PulsarClientImpl client; + private PartitionConsumerIdleDetector idleDetector; + + @BeforeMethod(alwaysRun = true) + public void setUp() { + consumerConf = new ConsumerConfigurationData<>(); + consumerConf.setSubscriptionName("test-sub"); + consumerConf.setConsumerIdleTimeoutMs(1000); // 1 second for testing + consumerConf.setEnablePartitionOwnershipCheck(true); + + executorProvider = new ExecutorProvider(1, "PartitionConsumerIdleDetectorTest"); + internalExecutor = Executors.newSingleThreadScheduledExecutor(); + + client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor); + ClientConfigurationData clientConf = client.getConfiguration(); + clientConf.setOperationTimeoutMs(100); + clientConf.setStatsIntervalSeconds(0); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + consumer = spy(ConsumerImpl.newConsumerImpl(client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true)); + consumer.setState(HandlerState.State.Ready); + + idleDetector = new PartitionConsumerIdleDetector(consumer, 1000); + } + + @AfterMethod(alwaysRun = true) + public void cleanup() { + if (executorProvider != null) { + executorProvider.shutdownNow(); + executorProvider = null; + } + if (internalExecutor != null) { + internalExecutor.shutdownNow(); + internalExecutor = null; + } + } + + @Test + public void testMarkActiveResetsTimer() throws Exception { + // Verify initial timestamp + long initialTimestamp = System.currentTimeMillis(); + + // Wait a bit to ensure timestamp would change + Thread.sleep(100); + + // Mark as active + idleDetector.markActive(); + + // Check that the consumer is not considered idle yet + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify no reconnection was triggered (would be verified by no calls to connection handler) + verify(consumer, never()).getConnectionHandler(); + } + + @Test + public void testIdleDetectionDoesNotTriggerWhenNotIdle() throws Exception { + // Mark as active to reset the timer + idleDetector.markActive(); + + // Immediately check (consumer is not idle) + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify no ownership verification was attempted + verify(consumer, never()).getClient(); + } + + @Test + public void testIdleDetectionWhenNoConnection() throws Exception { + // Wait for idle timeout + Thread.sleep(1200); + + // Set no connection + when(consumer.getClientCnx()).thenReturn(null); + + // Check idle - should not trigger reconnect when no connection + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify that it checked for connection but didn't proceed + verify(consumer, times(1)).getClientCnx(); + } + + @Test + public void testOwnershipChangeDetection() throws Exception { + // Wait for idle timeout + Thread.sleep(1200); + + // Mock connection and lookup + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress oldAddress = new InetSocketAddress("localhost", 6650); + InetSocketAddress newAddress = new InetSocketAddress("localhost", 6651); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(oldAddress); + + // Mock lookup service to return different broker + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + LookupTopicResult lookupResult = new LookupTopicResult(newAddress, newAddress, false, false); + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.completedFuture(lookupResult)); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should trigger reconnect due to ownership change + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was triggered + verify(mockConnectionHandler, times(1)) + .reconnectLater(any(PulsarClientException.class)); + } + + @Test + public void testNoReconnectWhenOwnershipUnchanged() throws Exception { + // Wait for idle timeout + Thread.sleep(1200); + + // Mock connection and lookup with same broker + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress sameAddress = new InetSocketAddress("localhost", 6650); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(sameAddress); + + // Mock lookup service to return same broker + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + LookupTopicResult lookupResult = new LookupTopicResult(sameAddress, sameAddress, false, false); + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.completedFuture(lookupResult)); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should NOT trigger reconnect (same broker) + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was NOT triggered + verify(mockConnectionHandler, never()) + .reconnectLater(any(PulsarClientException.class)); + } + + @Test + public void testLookupFailureDoesNotTriggerReconnect() throws Exception { + // Wait for idle timeout + Thread.sleep(1200); + + // Mock connection + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress address = new InetSocketAddress("localhost", 6650); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(address); + + // Mock lookup service to fail + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.failedFuture( + new PulsarClientException("Lookup failed"))); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should handle lookup failure gracefully + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was NOT triggered on lookup failure + verify(mockConnectionHandler, never()) + .reconnectLater(any(PulsarClientException.class)); + } + + @Test + public void testDisabledWhenConfiguredWithZeroTimeout() { + // Create consumer with idle detection disabled + consumerConf.setConsumerIdleTimeoutMs(0); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + ConsumerImpl consumerWithDisabledDetector = ConsumerImpl.newConsumerImpl( + client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true); + + // Idle detector should be null when disabled + // This is validated by the fact that consumer creation succeeds without idle detector + assertNotNull(consumerWithDisabledDetector); + } + + @Test + public void testDisabledWhenOwnershipCheckDisabled() { + // Create consumer with ownership check disabled + consumerConf.setConsumerIdleTimeoutMs(1000); + consumerConf.setEnablePartitionOwnershipCheck(false); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + ConsumerImpl consumerWithDisabledDetector = ConsumerImpl.newConsumerImpl( + client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true); + + // Idle detector should be null when ownership check is disabled + assertNotNull(consumerWithDisabledDetector); + } +} From a8d60be69fbf634f4405f10c9e9dfffa522e11e4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:47:15 +0000 Subject: [PATCH 4/9] Address code review feedback: fix NPE, ClassCastException, and extract magic numbers Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../apache/pulsar/client/impl/ConsumerImpl.java | 5 +++-- .../impl/PartitionConsumerIdleDetector.java | 17 ++++++++++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 62c4ddc3f42c0..e86ee9c0c1356 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -144,6 +144,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; + private static final long IDLE_CHECK_INTERVAL_MS = 10_000; // 10 seconds final long consumerId; @@ -452,14 +453,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat if (idleTimeoutMs > 0 && conf.isEnablePartitionOwnershipCheck()) { this.idleDetector = new PartitionConsumerIdleDetector(this, idleTimeoutMs); - // Schedule periodic check every 10 seconds + // Schedule periodic check this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { idleDetector.checkIdleAndReconnectIfNeeded() .exceptionally(ex -> { log.warn("[{}][{}] Idle check failed", topic, subscription, ex); return null; }); - }, idleTimeoutMs, 10_000, TimeUnit.MILLISECONDS); + }, idleTimeoutMs, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); } else { this.idleDetector = null; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java index ef12153ea60c0..b63bb64619046 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -99,12 +99,27 @@ private CompletableFuture verifyTopicOwnership() { return CompletableFuture.completedFuture(false); } - InetSocketAddress currentBroker = currentCnx.ctx().channel().remoteAddress(); + // Safely get the remote address and verify it's an InetSocketAddress + if (!(currentCnx.ctx().channel().remoteAddress() instanceof InetSocketAddress)) { + log.warn("[{}][{}] Remote address is not an InetSocketAddress, skipping ownership check", + consumer.getTopic(), consumer.getSubscription()); + return CompletableFuture.completedFuture(false); + } + + InetSocketAddress currentBroker = (InetSocketAddress) currentCnx.ctx().channel().remoteAddress(); // Perform topic lookup to find the current owner return consumer.getClient().getLookup().getBroker(consumer.getTopicName()) .thenApply(lookupResult -> { InetSocketAddress newBroker = lookupResult.getLogicalAddress(); + + // Check for null to prevent NPE + if (newBroker == null) { + log.warn("[{}][{}] Lookup returned null logical address", + consumer.getTopic(), consumer.getSubscription()); + return false; + } + boolean changed = !currentBroker.equals(newBroker); if (changed) { From 28e684823a631f7be199556aa1a7d2c9caf17a9f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:49:01 +0000 Subject: [PATCH 5/9] Fix LookupTopicResult constructor calls and improve initial delay for idle detection Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++-- .../pulsar/client/impl/PartitionConsumerIdleDetectorTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e86ee9c0c1356..252ca635b331f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -453,14 +453,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat if (idleTimeoutMs > 0 && conf.isEnablePartitionOwnershipCheck()) { this.idleDetector = new PartitionConsumerIdleDetector(this, idleTimeoutMs); - // Schedule periodic check + // Schedule periodic check with smaller initial delay for earlier detection this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { idleDetector.checkIdleAndReconnectIfNeeded() .exceptionally(ex -> { log.warn("[{}][{}] Idle check failed", topic, subscription, ex); return null; }); - }, idleTimeoutMs, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); + }, IDLE_CHECK_INTERVAL_MS, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); } else { this.idleDetector = null; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java index c0b7dc40fbb0a..6b45ebf73ca8f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java @@ -162,7 +162,7 @@ public void testOwnershipChangeDetection() throws Exception { when(consumer.getClient()).thenReturn(client); when(client.getLookup()).thenReturn(mockLookup); - LookupTopicResult lookupResult = new LookupTopicResult(newAddress, newAddress, false, false); + LookupTopicResult lookupResult = new LookupTopicResult(newAddress, newAddress, false); when(mockLookup.getBroker(any(TopicName.class))) .thenReturn(CompletableFuture.completedFuture(lookupResult)); @@ -200,7 +200,7 @@ public void testNoReconnectWhenOwnershipUnchanged() throws Exception { when(consumer.getClient()).thenReturn(client); when(client.getLookup()).thenReturn(mockLookup); - LookupTopicResult lookupResult = new LookupTopicResult(sameAddress, sameAddress, false, false); + LookupTopicResult lookupResult = new LookupTopicResult(sameAddress, sameAddress, false); when(mockLookup.getBroker(any(TopicName.class))) .thenReturn(CompletableFuture.completedFuture(lookupResult)); From 0a53bfaa6303b36f5bd2f9d9370879797e2b75c2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 03:52:19 +0000 Subject: [PATCH 6/9] Improve code quality: extract cleanup method, add exception handling, improve test reliability Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../pulsar/client/impl/ConsumerImpl.java | 15 ++++-- .../impl/PartitionConsumerIdleDetector.java | 47 +++++++++++-------- .../PartitionConsumerIdleDetectorTest.java | 16 ++++--- 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 252ca635b331f..062ae360f5100 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -455,11 +455,16 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat // Schedule periodic check with smaller initial delay for earlier detection this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { - idleDetector.checkIdleAndReconnectIfNeeded() - .exceptionally(ex -> { - log.warn("[{}][{}] Idle check failed", topic, subscription, ex); - return null; - }); + try { + idleDetector.checkIdleAndReconnectIfNeeded() + .exceptionally(ex -> { + log.warn("[{}][{}] Idle check failed", topic, subscription, ex); + return null; + }); + } catch (Throwable t) { + // Catch all exceptions to prevent thread pool corruption + log.error("[{}][{}] Unexpected error in idle check task", topic, subscription, t); + } }, IDLE_CHECK_INTERVAL_MS, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); } else { this.idleDetector = null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java index b63bb64619046..3d86dc9b1c8b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -145,26 +145,7 @@ private CompletableFuture verifyTopicOwnership() { */ private CompletableFuture reconnectWithCleanup() { return CompletableFuture.runAsync(() -> { - try { - // 1. Clear unacked message tracker - consumer.getUnAckedMessageTracker().clear(); - - // 2. Clear pending ack queues in acknowledgment tracker - consumer.getAcknowledgmentsGroupingTracker().flush(); - - // 3. Clear batch message ack tracker (if exists) - // The batch ack tracker is internal to acknowledgment tracker - - // 4. Increment consumer epoch to reject old acks - consumer.incrementConsumerEpoch(); - - log.info("[{}][{}] Cleaned up consumer state before reconnection", - consumer.getTopic(), consumer.getSubscription()); - - } catch (Exception e) { - log.error("[{}][{}] Error during cleanup before reconnection", - consumer.getTopic(), consumer.getSubscription(), e); - } + cleanupConsumerState(); }, consumer.getInternalPinnedExecutor()) .thenCompose(__ -> { // Trigger reconnection by calling reconnectLater on connection handler @@ -173,4 +154,30 @@ private CompletableFuture reconnectWithCleanup() { return CompletableFuture.completedFuture(null); }); } + + /** + * Clean up consumer state before reconnection. + */ + private void cleanupConsumerState() { + try { + // 1. Clear unacked message tracker + consumer.getUnAckedMessageTracker().clear(); + + // 2. Clear pending ack queues in acknowledgment tracker + consumer.getAcknowledgmentsGroupingTracker().flush(); + + // 3. Clear batch message ack tracker (if exists) + // The batch ack tracker is internal to acknowledgment tracker + + // 4. Increment consumer epoch to reject old acks + consumer.incrementConsumerEpoch(); + + log.info("[{}][{}] Cleaned up consumer state before reconnection", + consumer.getTopic(), consumer.getSubscription()); + + } catch (Exception e) { + log.error("[{}][{}] Error during cleanup before reconnection", + consumer.getTopic(), consumer.getSubscription(), e); + } + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java index 6b45ebf73ca8f..c0554598c2751 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java @@ -126,7 +126,7 @@ public void testIdleDetectionDoesNotTriggerWhenNotIdle() throws Exception { @Test public void testIdleDetectionWhenNoConnection() throws Exception { - // Wait for idle timeout + // Wait for idle timeout - necessary to reach idle state Thread.sleep(1200); // Set no connection @@ -142,7 +142,7 @@ public void testIdleDetectionWhenNoConnection() throws Exception { @Test public void testOwnershipChangeDetection() throws Exception { - // Wait for idle timeout + // Wait for idle timeout - necessary to reach idle state Thread.sleep(1200); // Mock connection and lookup @@ -174,14 +174,18 @@ public void testOwnershipChangeDetection() throws Exception { CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); result.get(2, TimeUnit.SECONDS); - // Verify reconnection was triggered - verify(mockConnectionHandler, times(1)) - .reconnectLater(any(PulsarClientException.class)); + // Verify reconnection was triggered using Awaitility + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(mockConnectionHandler, times(1)) + .reconnectLater(any(PulsarClientException.class)) + ); } @Test public void testNoReconnectWhenOwnershipUnchanged() throws Exception { - // Wait for idle timeout + // Wait for idle timeout - necessary to reach idle state Thread.sleep(1200); // Mock connection and lookup with same broker From 33e09e75af2cc470d50a2a746bfd7c44ed0c4464 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 04:14:47 +0000 Subject: [PATCH 7/9] Remove enablePartitionOwnershipCheck and eliminate null checks with no-op detector pattern Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../pulsar/client/impl/ConsumerImpl.java | 46 ++++++------------- .../impl/PartitionConsumerIdleDetector.java | 29 +++++++++++- .../impl/conf/ConsumerConfigurationData.java | 15 ------ .../PartitionConsumerIdleDetectorTest.java | 3 +- 4 files changed, 42 insertions(+), 51 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 062ae360f5100..6839636f2585e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -448,12 +448,12 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, "The number of messages sent to DLQ", topic, attrs); - // Initialize idle detector if enabled + // Initialize idle detector (either enabled or no-op based on configuration) long idleTimeoutMs = conf.getConsumerIdleTimeoutMs(); - if (idleTimeoutMs > 0 && conf.isEnablePartitionOwnershipCheck()) { - this.idleDetector = new PartitionConsumerIdleDetector(this, idleTimeoutMs); - - // Schedule periodic check with smaller initial delay for earlier detection + this.idleDetector = PartitionConsumerIdleDetector.create(this, idleTimeoutMs); + + // Schedule periodic check if idle detection is enabled + if (idleDetector.isEnabled()) { this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { try { idleDetector.checkIdleAndReconnectIfNeeded() @@ -467,7 +467,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat } }, IDLE_CHECK_INTERVAL_MS, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); } else { - this.idleDetector = null; + this.idleCheckTask = null; } grabCnx(); @@ -570,9 +570,7 @@ protected Message internalReceive() throws PulsarClientException { } message = incomingMessages.take(); messageProcessed(message); - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); return beforeConsume(message); } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); @@ -593,9 +591,7 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { messageProcessed(message); - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); result.complete(beforeConsume(message)); } }); @@ -615,9 +611,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); message = listener == null ? beforeConsume(message) : message; return message; } catch (InterruptedException e) { @@ -683,9 +677,7 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return FutureUtil.failedFuture(exception); } - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); if (txn != null) { return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, @@ -710,9 +702,7 @@ protected CompletableFuture doAcknowledge(List messageIdList, A return FutureUtil.failedFuture(exception); } - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); if (txn != null) { return doTransactionAcknowledgeForResponse(messageIdList, ackType, @@ -909,10 +899,7 @@ private static void copyMessageEventTime(Message message, public void negativeAcknowledge(MessageId messageId) { consumerNacksCounter.increment(); negativeAcksTracker.add(messageId); - - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); @@ -922,10 +909,7 @@ public void negativeAcknowledge(MessageId messageId) { public void negativeAcknowledge(Message message) { consumerNacksCounter.increment(); negativeAcksTracker.add(message); - - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId())); @@ -2283,9 +2267,7 @@ public void redeliverUnacknowledgedMessages() { // Second : we should synchronized `ClientCnx cnx = cnx()` to // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker synchronized (ConsumerImpl.this) { - if (idleDetector != null) { - idleDetector.markActive(); - } + idleDetector.markActive(); ClientCnx cnx = cnx(); // V1 don't support redeliverUnacknowledgedMessages diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java index 3d86dc9b1c8b9..74311ddcaa1f0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -38,26 +38,51 @@ class PartitionConsumerIdleDetector { private final ConsumerImpl consumer; private final long idleTimeoutMs; private final AtomicLong lastActivityTimestamp; + private final boolean enabled; - PartitionConsumerIdleDetector(ConsumerImpl consumer, long idleTimeoutMs) { + // No-op detector when idle detection is disabled + static PartitionConsumerIdleDetector disabled(ConsumerImpl consumer) { + return new PartitionConsumerIdleDetector(consumer, 0, false); + } + + static PartitionConsumerIdleDetector create(ConsumerImpl consumer, long idleTimeoutMs) { + if (idleTimeoutMs <= 0) { + return disabled(consumer); + } + return new PartitionConsumerIdleDetector(consumer, idleTimeoutMs, true); + } + + private PartitionConsumerIdleDetector(ConsumerImpl consumer, long idleTimeoutMs, boolean enabled) { this.consumer = consumer; this.idleTimeoutMs = idleTimeoutMs; - this.lastActivityTimestamp = new AtomicLong(System.currentTimeMillis()); + this.enabled = enabled; + this.lastActivityTimestamp = enabled ? new AtomicLong(System.currentTimeMillis()) : null; } /** * Mark the consumer as active (message received or acknowledged). */ void markActive() { + if (!enabled) { + return; + } lastActivityTimestamp.set(System.currentTimeMillis()); } + boolean isEnabled() { + return enabled; + } + /** * Check if the consumer is idle and reconnect if topic ownership changed. * * @return CompletableFuture that completes when the check is done */ CompletableFuture checkIdleAndReconnectIfNeeded() { + if (!enabled) { + return CompletableFuture.completedFuture(null); + } + long idleDuration = System.currentTimeMillis() - lastActivityTimestamp.get(); if (idleDuration < idleTimeoutMs) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index a4f097d255b10..3ad2f89da1f1e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -436,21 +436,6 @@ public int getMaxPendingChuckedMessage() { ) private long consumerIdleTimeoutMs = 30_000; - /** - * Enable partition-level topic ownership verification. - * When enabled, consumers will periodically check if topic ownership has changed - * and automatically reconnect to the new broker if needed. - * - * Default: true - */ - @ApiModelProperty( - name = "enablePartitionOwnershipCheck", - value = "Enable partition-level topic ownership verification. " - + "When enabled, consumers will periodically check if topic ownership has changed " - + "and automatically reconnect to the new broker if needed." - ) - private boolean enablePartitionOwnershipCheck = true; - public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { return topicConfigurations.stream() .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName)) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java index c0554598c2751..fb642347c331c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java @@ -61,7 +61,6 @@ public void setUp() { consumerConf = new ConsumerConfigurationData<>(); consumerConf.setSubscriptionName("test-sub"); consumerConf.setConsumerIdleTimeoutMs(1000); // 1 second for testing - consumerConf.setEnablePartitionOwnershipCheck(true); executorProvider = new ExecutorProvider(1, "PartitionConsumerIdleDetectorTest"); internalExecutor = Executors.newSingleThreadScheduledExecutor(); @@ -77,7 +76,7 @@ public void setUp() { true)); consumer.setState(HandlerState.State.Ready); - idleDetector = new PartitionConsumerIdleDetector(consumer, 1000); + idleDetector = PartitionConsumerIdleDetector.create(consumer, 1000); } @AfterMethod(alwaysRun = true) From 3891e64480586b7984b8e4fc032b40de684e35e2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 04:16:26 +0000 Subject: [PATCH 8/9] Add race condition handling between broker notification and idle detector reconnect Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../client/impl/PartitionConsumerIdleDetector.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java index 74311ddcaa1f0..75f99d42659b7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -166,14 +166,26 @@ private CompletableFuture verifyTopicOwnership() { /** * Reconnect the consumer with comprehensive cleanup. * + * This method is synchronized with broker-initiated reconnections through the ConnectionHandler. + * If the broker has already initiated a reconnection (via CommandCloseConsumer), the + * ConnectionHandler will handle the race condition and avoid duplicate reconnections. + * * @return CompletableFuture that completes when reconnection is initiated */ private CompletableFuture reconnectWithCleanup() { + // Check consumer state before triggering reconnection to avoid race with broker notifications + HandlerState.State state = consumer.getState(); + if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) { + // Consumer is already closing/closed, no need to reconnect + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.runAsync(() -> { cleanupConsumerState(); }, consumer.getInternalPinnedExecutor()) .thenCompose(__ -> { // Trigger reconnection by calling reconnectLater on connection handler + // The ConnectionHandler will handle synchronization with broker-initiated reconnections consumer.getConnectionHandler().reconnectLater( new PulsarClientException("Topic ownership changed, reconnecting")); return CompletableFuture.completedFuture(null); From acf0c83ad37cfe90fcbc7a510d43178b4b83cad0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 15 Jan 2026 04:18:06 +0000 Subject: [PATCH 9/9] Fix potential NPE by always initializing lastActivityTimestamp Co-authored-by: nodece <16235121+nodece@users.noreply.github.com> --- .../pulsar/client/impl/PartitionConsumerIdleDetector.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java index 75f99d42659b7..4d408f641ed93 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -56,7 +56,8 @@ private PartitionConsumerIdleDetector(ConsumerImpl consumer, long idleTimeout this.consumer = consumer; this.idleTimeoutMs = idleTimeoutMs; this.enabled = enabled; - this.lastActivityTimestamp = enabled ? new AtomicLong(System.currentTimeMillis()) : null; + // Always initialize to avoid null pointer issues, even when disabled + this.lastActivityTimestamp = new AtomicLong(System.currentTimeMillis()); } /**