Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -143,6 +144,7 @@

public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
private static final long IDLE_CHECK_INTERVAL_MS = 10_000; // 10 seconds

final long consumerId;

Expand Down Expand Up @@ -245,6 +247,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
// This field will be set after the state becomes Failed, then the following operations will fail immediately
private volatile Throwable failReason = null;

private final PartitionConsumerIdleDetector idleDetector;
private volatile ScheduledFuture<?> idleCheckTask;

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
Expand Down Expand Up @@ -442,6 +447,29 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
"The number of negatively acknowledged messages", topic, attrs);
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
"The number of messages sent to DLQ", topic, attrs);

// Initialize idle detector (either enabled or no-op based on configuration)
long idleTimeoutMs = conf.getConsumerIdleTimeoutMs();
this.idleDetector = PartitionConsumerIdleDetector.create(this, idleTimeoutMs);

// Schedule periodic check if idle detection is enabled
if (idleDetector.isEnabled()) {
this.idleCheckTask = client.timer().scheduleAtFixedRate(() -> {
try {
idleDetector.checkIdleAndReconnectIfNeeded()
.exceptionally(ex -> {
log.warn("[{}][{}] Idle check failed", topic, subscription, ex);
return null;
});
} catch (Throwable t) {
// Catch all exceptions to prevent thread pool corruption
log.error("[{}][{}] Unexpected error in idle check task", topic, subscription, t);
}
}, IDLE_CHECK_INTERVAL_MS, IDLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
} else {
this.idleCheckTask = null;
}

grabCnx();

consumersOpenedCounter.increment();
Expand All @@ -461,6 +489,31 @@ NegativeAcksTracker getNegativeAcksTracker() {
return negativeAcksTracker;
}

// Package-private methods for PartitionConsumerIdleDetector
AcknowledgmentsGroupingTracker getAcknowledgmentsGroupingTracker() {
return acknowledgmentsGroupingTracker;
}

void incrementConsumerEpoch() {
CONSUMER_EPOCH.incrementAndGet(this);
}

ExecutorService getInternalPinnedExecutor() {
return internalPinnedExecutor;
}

PulsarClientImpl getClient() {
return client;
}

TopicName getTopicName() {
return topicName;
}

ClientCnx getClientCnx() {
return cnx();
}

@Override
public CompletableFuture<Void> unsubscribeAsync(boolean force) {
if (getState() == State.Closing || getState() == State.Closed) {
Expand Down Expand Up @@ -517,6 +570,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
}
message = incomingMessages.take();
messageProcessed(message);
idleDetector.markActive();
return beforeConsume(message);
} catch (InterruptedException e) {
ExceptionHandler.handleInterruptedException(e);
Expand All @@ -537,6 +591,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
messageProcessed(message);
idleDetector.markActive();
result.complete(beforeConsume(message));
}
});
Expand All @@ -556,6 +611,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
return null;
}
messageProcessed(message);
idleDetector.markActive();
message = listener == null ? beforeConsume(message) : message;
return message;
} catch (InterruptedException e) {
Expand Down Expand Up @@ -621,6 +677,8 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
return FutureUtil.failedFuture(exception);
}

idleDetector.markActive();

if (txn != null) {
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
Expand All @@ -643,6 +701,9 @@ protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, A
}
return FutureUtil.failedFuture(exception);
}

idleDetector.markActive();

if (txn != null) {
return doTransactionAcknowledgeForResponse(messageIdList, ackType,
properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
Expand Down Expand Up @@ -838,6 +899,7 @@ private static void copyMessageEventTime(Message<?> message,
public void negativeAcknowledge(MessageId messageId) {
consumerNacksCounter.increment();
negativeAcksTracker.add(messageId);
idleDetector.markActive();

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
Expand All @@ -847,6 +909,7 @@ public void negativeAcknowledge(MessageId messageId) {
public void negativeAcknowledge(Message<?> message) {
consumerNacksCounter.increment();
negativeAcksTracker.add(message);
idleDetector.markActive();

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
Expand Down Expand Up @@ -1253,6 +1316,9 @@ private void closeConsumerTasks() {
}
negativeAcksTracker.close();
stats.getStatTimeout().ifPresent(Timeout::cancel);
if (idleCheckTask != null) {
idleCheckTask.cancel(false);
}
//terminate incomingMessages queue, stop accept the new messages and waking up blocked thread.
incomingMessages.terminate(Message::release);
clearIncomingMessages();
Expand Down Expand Up @@ -2201,6 +2267,8 @@ public void redeliverUnacknowledgedMessages() {
// Second : we should synchronized `ClientCnx cnx = cnx()` to
// prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker
synchronized (ConsumerImpl.this) {
idleDetector.markActive();

ClientCnx cnx = cnx();
// V1 don't support redeliverUnacknowledgedMessages
if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException;

/**
* Detects when a partition consumer becomes idle and triggers reconnection if topic ownership changed.
*
* This helps prevent permanent message loss when:
* - Topic ownership transfers to a different broker
* - The old broker fails to send CommandCloseConsumer notification
* - The consumer remains connected to a broker that no longer owns the topic
*/
@Slf4j
class PartitionConsumerIdleDetector {
private final ConsumerImpl<?> consumer;
private final long idleTimeoutMs;
private final AtomicLong lastActivityTimestamp;
private final boolean enabled;

// No-op detector when idle detection is disabled
static PartitionConsumerIdleDetector disabled(ConsumerImpl<?> consumer) {
return new PartitionConsumerIdleDetector(consumer, 0, false);
}

static PartitionConsumerIdleDetector create(ConsumerImpl<?> consumer, long idleTimeoutMs) {
if (idleTimeoutMs <= 0) {
return disabled(consumer);
}
return new PartitionConsumerIdleDetector(consumer, idleTimeoutMs, true);
}

private PartitionConsumerIdleDetector(ConsumerImpl<?> consumer, long idleTimeoutMs, boolean enabled) {
this.consumer = consumer;
this.idleTimeoutMs = idleTimeoutMs;
this.enabled = enabled;
// Always initialize to avoid null pointer issues, even when disabled
this.lastActivityTimestamp = new AtomicLong(System.currentTimeMillis());
}

/**
* Mark the consumer as active (message received or acknowledged).
*/
void markActive() {
if (!enabled) {
return;
}
lastActivityTimestamp.set(System.currentTimeMillis());
}

boolean isEnabled() {
return enabled;
}

/**
* Check if the consumer is idle and reconnect if topic ownership changed.
*
* @return CompletableFuture that completes when the check is done
*/
CompletableFuture<Void> checkIdleAndReconnectIfNeeded() {
if (!enabled) {
return CompletableFuture.completedFuture(null);
}

long idleDuration = System.currentTimeMillis() - lastActivityTimestamp.get();

if (idleDuration < idleTimeoutMs) {
// Not idle yet
return CompletableFuture.completedFuture(null);
}

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Consumer idle for {}ms, verifying topic ownership",
consumer.getTopic(), consumer.getSubscription(), idleDuration);
}

return verifyTopicOwnership()
.thenCompose(ownershipChanged -> {
if (ownershipChanged) {
log.info("[{}][{}] Topic ownership changed, reconnecting consumer",
consumer.getTopic(), consumer.getSubscription());
return reconnectWithCleanup();
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Topic ownership unchanged, no reconnection needed",
consumer.getTopic(), consumer.getSubscription());
}
return CompletableFuture.completedFuture(null);
}
});
}

/**
* Verify if topic ownership has changed by performing a lookup.
*
* @return CompletableFuture<Boolean> true if ownership changed, false otherwise
*/
private CompletableFuture<Boolean> verifyTopicOwnership() {
// Get current broker address
ClientCnx currentCnx = consumer.getClientCnx();
if (currentCnx == null) {
// No current connection, no need to check
return CompletableFuture.completedFuture(false);
}

// Safely get the remote address and verify it's an InetSocketAddress
if (!(currentCnx.ctx().channel().remoteAddress() instanceof InetSocketAddress)) {
log.warn("[{}][{}] Remote address is not an InetSocketAddress, skipping ownership check",
consumer.getTopic(), consumer.getSubscription());
return CompletableFuture.completedFuture(false);
}

InetSocketAddress currentBroker = (InetSocketAddress) currentCnx.ctx().channel().remoteAddress();

// Perform topic lookup to find the current owner
return consumer.getClient().getLookup().getBroker(consumer.getTopicName())
.thenApply(lookupResult -> {
InetSocketAddress newBroker = lookupResult.getLogicalAddress();

// Check for null to prevent NPE
if (newBroker == null) {
log.warn("[{}][{}] Lookup returned null logical address",
consumer.getTopic(), consumer.getSubscription());
return false;
}

boolean changed = !currentBroker.equals(newBroker);

if (changed) {
log.info("[{}][{}] Topic ownership changed: {} -> {}",
consumer.getTopic(), consumer.getSubscription(),
currentBroker, newBroker);
}

return changed;
})
.exceptionally(ex -> {
log.warn("[{}][{}] Failed to verify topic ownership",
consumer.getTopic(), consumer.getSubscription(), ex);
// On lookup failure, don't trigger reconnection
return false;
});
}

/**
* Reconnect the consumer with comprehensive cleanup.
*
* This method is synchronized with broker-initiated reconnections through the ConnectionHandler.
* If the broker has already initiated a reconnection (via CommandCloseConsumer), the
* ConnectionHandler will handle the race condition and avoid duplicate reconnections.
*
* @return CompletableFuture that completes when reconnection is initiated
*/
private CompletableFuture<Void> reconnectWithCleanup() {
// Check consumer state before triggering reconnection to avoid race with broker notifications
HandlerState.State state = consumer.getState();
if (state == HandlerState.State.Closing || state == HandlerState.State.Closed) {
// Consumer is already closing/closed, no need to reconnect
return CompletableFuture.completedFuture(null);
}

return CompletableFuture.runAsync(() -> {
cleanupConsumerState();
}, consumer.getInternalPinnedExecutor())
.thenCompose(__ -> {
// Trigger reconnection by calling reconnectLater on connection handler
// The ConnectionHandler will handle synchronization with broker-initiated reconnections
consumer.getConnectionHandler().reconnectLater(
new PulsarClientException("Topic ownership changed, reconnecting"));
return CompletableFuture.completedFuture(null);
});
}

/**
* Clean up consumer state before reconnection.
*/
private void cleanupConsumerState() {
try {
// 1. Clear unacked message tracker
consumer.getUnAckedMessageTracker().clear();

// 2. Clear pending ack queues in acknowledgment tracker
consumer.getAcknowledgmentsGroupingTracker().flush();

// 3. Clear batch message ack tracker (if exists)
// The batch ack tracker is internal to acknowledgment tracker

// 4. Increment consumer epoch to reject old acks
consumer.incrementConsumerEpoch();

log.info("[{}][{}] Cleaned up consumer state before reconnection",
consumer.getTopic(), consumer.getSubscription());

} catch (Exception e) {
log.error("[{}][{}] Error during cleanup before reconnection",
consumer.getTopic(), consumer.getSubscription(), e);
}
}
}
Loading