diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 37e9f16fe0292..6839636f2585e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -143,6 +144,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; + private static final long IDLE_CHECK_INTERVAL_MS = 10_000; // 10 seconds final long consumerId; @@ -245,6 +247,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle // This field will be set after the state becomes Failed, then the following operations will fail immediately private volatile Throwable failReason = null; + private final PartitionConsumerIdleDetector idleDetector; + private volatile ScheduledFuture idleCheckTask; + static ConsumerImpl newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, @@ -442,6 +447,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat "The number of negatively acknowledged messages", topic, attrs); consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, "The number of messages sent to DLQ", topic, attrs); + + // Initialize idle detector (either enabled or no-op based on configuration) + long idleTimeoutMs = conf.getConsumerIdleTimeoutMs(); + this.idleDetector = PartitionConsumerIdleDetector.create(this, idleTimeoutMs); + + // Schedule periodic check if idle detection is enabled + if (idleDetector.isEnabled()) { + this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> { + try { + idleDetector.checkIdleAndReconnectIfNeeded() + .exceptionally(ex -> { + log.warn("[{}][{}] Idle check failed", topic, subscription, ex); + return null; + }); + } catch (Throwable t) { + // Catch all exceptions to prevent thread pool corruption + log.error("[{}][{}] Unexpected error in idle check task", topic, subscription, t); + } + }, IDLE_CHECK_INTERVAL_MS, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); + } else { + this.idleCheckTask = null; + } + grabCnx(); consumersOpenedCounter.increment(); @@ -461,6 +489,31 @@ NegativeAcksTracker getNegativeAcksTracker() { return negativeAcksTracker; } + // Package-private methods for PartitionConsumerIdleDetector + AcknowledgmentsGroupingTracker getAcknowledgmentsGroupingTracker() { + return acknowledgmentsGroupingTracker; + } + + void incrementConsumerEpoch() { + CONSUMER_EPOCH.incrementAndGet(this); + } + + ExecutorService getInternalPinnedExecutor() { + return internalPinnedExecutor; + } + + PulsarClientImpl getClient() { + return client; + } + + TopicName getTopicName() { + return topicName; + } + + ClientCnx getClientCnx() { + return cnx(); + } + @Override public CompletableFuture unsubscribeAsync(boolean force) { if (getState() == State.Closing || getState() == State.Closed) { @@ -517,6 +570,7 @@ protected Message internalReceive() throws PulsarClientException { } message = incomingMessages.take(); messageProcessed(message); + idleDetector.markActive(); return beforeConsume(message); } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); @@ -537,6 +591,7 @@ protected CompletableFuture> internalReceiveAsync() { cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { messageProcessed(message); + idleDetector.markActive(); result.complete(beforeConsume(message)); } }); @@ -556,6 +611,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); + idleDetector.markActive(); message = listener == null ? beforeConsume(message) : message; return message; } catch (InterruptedException e) { @@ -621,6 +677,8 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack return FutureUtil.failedFuture(exception); } + idleDetector.markActive(); + if (txn != null) { return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); @@ -643,6 +701,9 @@ protected CompletableFuture doAcknowledge(List messageIdList, A } return FutureUtil.failedFuture(exception); } + + idleDetector.markActive(); + if (txn != null) { return doTransactionAcknowledgeForResponse(messageIdList, ackType, properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())); @@ -838,6 +899,7 @@ private static void copyMessageEventTime(Message message, public void negativeAcknowledge(MessageId messageId) { consumerNacksCounter.increment(); negativeAcksTracker.add(messageId); + idleDetector.markActive(); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId)); @@ -847,6 +909,7 @@ public void negativeAcknowledge(MessageId messageId) { public void negativeAcknowledge(Message message) { consumerNacksCounter.increment(); negativeAcksTracker.add(message); + idleDetector.markActive(); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId())); @@ -1253,6 +1316,9 @@ private void closeConsumerTasks() { } negativeAcksTracker.close(); stats.getStatTimeout().ifPresent(Timeout::cancel); + if (idleCheckTask != null) { + idleCheckTask.cancel(false); + } //terminate incomingMessages queue, stop accept the new messages and waking up blocked thread. incomingMessages.terminate(Message::release); clearIncomingMessages(); @@ -2201,6 +2267,8 @@ public void redeliverUnacknowledgedMessages() { // Second : we should synchronized `ClientCnx cnx = cnx()` to // prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker synchronized (ConsumerImpl.this) { + idleDetector.markActive(); + ClientCnx cnx = cnx(); // V1 don't support redeliverUnacknowledgedMessages if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java new file mode 100644 index 0000000000000..4d408f641ed93 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetector.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Detects when a partition consumer becomes idle and triggers reconnection if topic ownership changed. + * + * This helps prevent permanent message loss when: + * - Topic ownership transfers to a different broker + * - The old broker fails to send CommandCloseConsumer notification + * - The consumer remains connected to a broker that no longer owns the topic + */ +@Slf4j +class PartitionConsumerIdleDetector { + private final ConsumerImpl consumer; + private final long idleTimeoutMs; + private final AtomicLong lastActivityTimestamp; + private final boolean enabled; + + // No-op detector when idle detection is disabled + static PartitionConsumerIdleDetector disabled(ConsumerImpl consumer) { + return new PartitionConsumerIdleDetector(consumer, 0, false); + } + + static PartitionConsumerIdleDetector create(ConsumerImpl consumer, long idleTimeoutMs) { + if (idleTimeoutMs <= 0) { + return disabled(consumer); + } + return new PartitionConsumerIdleDetector(consumer, idleTimeoutMs, true); + } + + private PartitionConsumerIdleDetector(ConsumerImpl consumer, long idleTimeoutMs, boolean enabled) { + this.consumer = consumer; + this.idleTimeoutMs = idleTimeoutMs; + this.enabled = enabled; + // Always initialize to avoid null pointer issues, even when disabled + this.lastActivityTimestamp = new AtomicLong(System.currentTimeMillis()); + } + + /** + * Mark the consumer as active (message received or acknowledged). + */ + void markActive() { + if (!enabled) { + return; + } + lastActivityTimestamp.set(System.currentTimeMillis()); + } + + boolean isEnabled() { + return enabled; + } + + /** + * Check if the consumer is idle and reconnect if topic ownership changed. + * + * @return CompletableFuture that completes when the check is done + */ + CompletableFuture checkIdleAndReconnectIfNeeded() { + if (!enabled) { + return CompletableFuture.completedFuture(null); + } + + long idleDuration = System.currentTimeMillis() - lastActivityTimestamp.get(); + + if (idleDuration < idleTimeoutMs) { + // Not idle yet + return CompletableFuture.completedFuture(null); + } + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Consumer idle for {}ms, verifying topic ownership", + consumer.getTopic(), consumer.getSubscription(), idleDuration); + } + + return verifyTopicOwnership() + .thenCompose(ownershipChanged -> { + if (ownershipChanged) { + log.info("[{}][{}] Topic ownership changed, reconnecting consumer", + consumer.getTopic(), consumer.getSubscription()); + return reconnectWithCleanup(); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Topic ownership unchanged, no reconnection needed", + consumer.getTopic(), consumer.getSubscription()); + } + return CompletableFuture.completedFuture(null); + } + }); + } + + /** + * Verify if topic ownership has changed by performing a lookup. + * + * @return CompletableFuture true if ownership changed, false otherwise + */ + private CompletableFuture verifyTopicOwnership() { + // Get current broker address + ClientCnx currentCnx = consumer.getClientCnx(); + if (currentCnx == null) { + // No current connection, no need to check + return CompletableFuture.completedFuture(false); + } + + // Safely get the remote address and verify it's an InetSocketAddress + if (!(currentCnx.ctx().channel().remoteAddress() instanceof InetSocketAddress)) { + log.warn("[{}][{}] Remote address is not an InetSocketAddress, skipping ownership check", + consumer.getTopic(), consumer.getSubscription()); + return CompletableFuture.completedFuture(false); + } + + InetSocketAddress currentBroker = (InetSocketAddress) currentCnx.ctx().channel().remoteAddress(); + + // Perform topic lookup to find the current owner + return consumer.getClient().getLookup().getBroker(consumer.getTopicName()) + .thenApply(lookupResult -> { + InetSocketAddress newBroker = lookupResult.getLogicalAddress(); + + // Check for null to prevent NPE + if (newBroker == null) { + log.warn("[{}][{}] Lookup returned null logical address", + consumer.getTopic(), consumer.getSubscription()); + return false; + } + + boolean changed = !currentBroker.equals(newBroker); + + if (changed) { + log.info("[{}][{}] Topic ownership changed: {} -> {}", + consumer.getTopic(), consumer.getSubscription(), + currentBroker, newBroker); + } + + return changed; + }) + .exceptionally(ex -> { + log.warn("[{}][{}] Failed to verify topic ownership", + consumer.getTopic(), consumer.getSubscription(), ex); + // On lookup failure, don't trigger reconnection + return false; + }); + } + + /** + * Reconnect the consumer with comprehensive cleanup. + * + * This method is synchronized with broker-initiated reconnections through the ConnectionHandler. + * If the broker has already initiated a reconnection (via CommandCloseConsumer), the + * ConnectionHandler will handle the race condition and avoid duplicate reconnections. + * + * @return CompletableFuture that completes when reconnection is initiated + */ + private CompletableFuture reconnectWithCleanup() { + // Check consumer state before triggering reconnection to avoid race with broker notifications + HandlerState.State state = consumer.getState(); + if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) { + // Consumer is already closing/closed, no need to reconnect + return CompletableFuture.completedFuture(null); + } + + return CompletableFuture.runAsync(() -> { + cleanupConsumerState(); + }, consumer.getInternalPinnedExecutor()) + .thenCompose(__ -> { + // Trigger reconnection by calling reconnectLater on connection handler + // The ConnectionHandler will handle synchronization with broker-initiated reconnections + consumer.getConnectionHandler().reconnectLater( + new PulsarClientException("Topic ownership changed, reconnecting")); + return CompletableFuture.completedFuture(null); + }); + } + + /** + * Clean up consumer state before reconnection. + */ + private void cleanupConsumerState() { + try { + // 1. Clear unacked message tracker + consumer.getUnAckedMessageTracker().clear(); + + // 2. Clear pending ack queues in acknowledgment tracker + consumer.getAcknowledgmentsGroupingTracker().flush(); + + // 3. Clear batch message ack tracker (if exists) + // The batch ack tracker is internal to acknowledgment tracker + + // 4. Increment consumer epoch to reject old acks + consumer.incrementConsumerEpoch(); + + log.info("[{}][{}] Cleaned up consumer state before reconnection", + consumer.getTopic(), consumer.getSubscription()); + + } catch (Exception e) { + log.error("[{}][{}] Error during cleanup before reconnection", + consumer.getTopic(), consumer.getSubscription(), e); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 42fc2666573fe..3ad2f89da1f1e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -419,6 +419,23 @@ public int getMaxPendingChuckedMessage() { private List topicConfigurations = new ArrayList<>(); + /** + * Consumer idle timeout in milliseconds. + * If the consumer doesn't receive messages or perform acks for this duration, + * it will trigger a lookup to verify topic ownership and reconnect if changed. + * + * Set to 0 to disable idle detection. + * Default: 30000 (30 seconds) + */ + @ApiModelProperty( + name = "consumerIdleTimeoutMs", + value = "Consumer idle timeout in milliseconds. " + + "If the consumer doesn't receive messages or perform acks for this duration, " + + "it will trigger a lookup to verify topic ownership and reconnect if changed. " + + "Set to 0 to disable idle detection." + ) + private long consumerIdleTimeoutMs = 30_000; + public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { return topicConfigurations.stream() .filter(topicConf -> topicConf.getTopicNameMatcher().matches(topicName)) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java new file mode 100644 index 0000000000000..fb642347c331c --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionConsumerIdleDetectorTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class PartitionConsumerIdleDetectorTest { + private final String topic = "persistent://tenant/ns1/my-topic"; + private ExecutorProvider executorProvider; + private ExecutorService internalExecutor; + private ConsumerImpl consumer; + private ConsumerConfigurationData consumerConf; + private PulsarClientImpl client; + private PartitionConsumerIdleDetector idleDetector; + + @BeforeMethod(alwaysRun = true) + public void setUp() { + consumerConf = new ConsumerConfigurationData<>(); + consumerConf.setSubscriptionName("test-sub"); + consumerConf.setConsumerIdleTimeoutMs(1000); // 1 second for testing + + executorProvider = new ExecutorProvider(1, "PartitionConsumerIdleDetectorTest"); + internalExecutor = Executors.newSingleThreadScheduledExecutor(); + + client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor); + ClientConfigurationData clientConf = client.getConfiguration(); + clientConf.setOperationTimeoutMs(100); + clientConf.setStatsIntervalSeconds(0); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + consumer = spy(ConsumerImpl.newConsumerImpl(client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true)); + consumer.setState(HandlerState.State.Ready); + + idleDetector = PartitionConsumerIdleDetector.create(consumer, 1000); + } + + @AfterMethod(alwaysRun = true) + public void cleanup() { + if (executorProvider != null) { + executorProvider.shutdownNow(); + executorProvider = null; + } + if (internalExecutor != null) { + internalExecutor.shutdownNow(); + internalExecutor = null; + } + } + + @Test + public void testMarkActiveResetsTimer() throws Exception { + // Verify initial timestamp + long initialTimestamp = System.currentTimeMillis(); + + // Wait a bit to ensure timestamp would change + Thread.sleep(100); + + // Mark as active + idleDetector.markActive(); + + // Check that the consumer is not considered idle yet + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify no reconnection was triggered (would be verified by no calls to connection handler) + verify(consumer, never()).getConnectionHandler(); + } + + @Test + public void testIdleDetectionDoesNotTriggerWhenNotIdle() throws Exception { + // Mark as active to reset the timer + idleDetector.markActive(); + + // Immediately check (consumer is not idle) + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify no ownership verification was attempted + verify(consumer, never()).getClient(); + } + + @Test + public void testIdleDetectionWhenNoConnection() throws Exception { + // Wait for idle timeout - necessary to reach idle state + Thread.sleep(1200); + + // Set no connection + when(consumer.getClientCnx()).thenReturn(null); + + // Check idle - should not trigger reconnect when no connection + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(1, TimeUnit.SECONDS); + + // Verify that it checked for connection but didn't proceed + verify(consumer, times(1)).getClientCnx(); + } + + @Test + public void testOwnershipChangeDetection() throws Exception { + // Wait for idle timeout - necessary to reach idle state + Thread.sleep(1200); + + // Mock connection and lookup + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress oldAddress = new InetSocketAddress("localhost", 6650); + InetSocketAddress newAddress = new InetSocketAddress("localhost", 6651); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(oldAddress); + + // Mock lookup service to return different broker + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + LookupTopicResult lookupResult = new LookupTopicResult(newAddress, newAddress, false); + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.completedFuture(lookupResult)); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should trigger reconnect due to ownership change + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was triggered using Awaitility + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .untilAsserted(() -> + verify(mockConnectionHandler, times(1)) + .reconnectLater(any(PulsarClientException.class)) + ); + } + + @Test + public void testNoReconnectWhenOwnershipUnchanged() throws Exception { + // Wait for idle timeout - necessary to reach idle state + Thread.sleep(1200); + + // Mock connection and lookup with same broker + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress sameAddress = new InetSocketAddress("localhost", 6650); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(sameAddress); + + // Mock lookup service to return same broker + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + LookupTopicResult lookupResult = new LookupTopicResult(sameAddress, sameAddress, false); + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.completedFuture(lookupResult)); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should NOT trigger reconnect (same broker) + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was NOT triggered + verify(mockConnectionHandler, never()) + .reconnectLater(any(PulsarClientException.class)); + } + + @Test + public void testLookupFailureDoesNotTriggerReconnect() throws Exception { + // Wait for idle timeout + Thread.sleep(1200); + + // Mock connection + ClientCnx mockCnx = mock(ClientCnx.class); + ChannelHandlerContext mockCtx = mock(ChannelHandlerContext.class); + Channel mockChannel = mock(Channel.class); + InetSocketAddress address = new InetSocketAddress("localhost", 6650); + + when(consumer.getClientCnx()).thenReturn(mockCnx); + when(mockCnx.ctx()).thenReturn(mockCtx); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(address); + + // Mock lookup service to fail + LookupService mockLookup = mock(LookupService.class); + when(consumer.getClient()).thenReturn(client); + when(client.getLookup()).thenReturn(mockLookup); + + when(mockLookup.getBroker(any(TopicName.class))) + .thenReturn(CompletableFuture.failedFuture( + new PulsarClientException("Lookup failed"))); + + // Mock connection handler + ConnectionHandler mockConnectionHandler = mock(ConnectionHandler.class); + when(consumer.getConnectionHandler()).thenReturn(mockConnectionHandler); + + // Check idle - should handle lookup failure gracefully + CompletableFuture result = idleDetector.checkIdleAndReconnectIfNeeded(); + result.get(2, TimeUnit.SECONDS); + + // Verify reconnection was NOT triggered on lookup failure + verify(mockConnectionHandler, never()) + .reconnectLater(any(PulsarClientException.class)); + } + + @Test + public void testDisabledWhenConfiguredWithZeroTimeout() { + // Create consumer with idle detection disabled + consumerConf.setConsumerIdleTimeoutMs(0); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + ConsumerImpl consumerWithDisabledDetector = ConsumerImpl.newConsumerImpl( + client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true); + + // Idle detector should be null when disabled + // This is validated by the fact that consumer creation succeeds without idle detector + assertNotNull(consumerWithDisabledDetector); + } + + @Test + public void testDisabledWhenOwnershipCheckDisabled() { + // Create consumer with ownership check disabled + consumerConf.setConsumerIdleTimeoutMs(1000); + consumerConf.setEnablePartitionOwnershipCheck(false); + + CompletableFuture> subscribeFuture = new CompletableFuture<>(); + ConsumerImpl consumerWithDisabledDetector = ConsumerImpl.newConsumerImpl( + client, topic, consumerConf, + executorProvider, -1, false, subscribeFuture, null, null, null, + true); + + // Idle detector should be null when ownership check is disabled + assertNotNull(consumerWithDisabledDetector); + } +}