Skip to content

[Bug][Client] Consumer lost message ack due to race condition in acknowledge with batch message #22352

@Shawyeok

Description

@Shawyeok

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

pulsar-client: 3.0.3

(This issue may affect pulsar-client version 3.0.0 or later. #19414)

Minimal reproduce step

  1. create a partitioned topic with 3 partitions, let's say persistent://public/default/clientBlockTest
  2. produce message with batch enabled
pulsar-perf produce -r 1000 -b 10 persistent://public/default/clientBlockTest
  1. start consumer with code below:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

@Slf4j
public class App {

    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        String serviceUrl = args[0];
        String topicName = "persistent://public/default/clientBlockTest";
        String subscriptionName = "sub0";
        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        Consumer<byte[]> consumer = client.newConsumer()
                .topic(topicName)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(subscriptionName)
                .ackTimeout(1, TimeUnit.MINUTES)
                .subscribe();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pulsar-consumer-%d").build();
        for (int i = 0; i < 16; i++) {
            threadFactory.newThread(() -> {
                while (true) {
                    try {
                        Message<byte[]> message = consumer.receive();
                        consumer.acknowledge(message);
                    } catch (PulsarClientException.AlreadyClosedException e) {
                        log.info("Consumer closed");
                        break;
                    } catch (PulsarClientException e) {
                        log.error("Failed to receive message", e);
                    }
                }
            }).start();
        }
        new CountDownLatch(1).await();
    }
}
  1. some messages will not be acked, and there will be no redelivery
  2. consumer will blocked on unacked messages eventually
topic stats
{
  "msgRateIn": 333.33173299657216,
  "msgThroughputIn": 347088.33361734066,
  "msgRateOut": 0.0,
  "msgThroughputOut": 0.0,
  "bytesInCounter": 552318240,
  "msgInCounter": 530504,
  "bytesOutCounter": 373741143,
  "msgOutCounter": 359001,
  "averageMsgSize": 1041.27,
  "msgChunkPublished": false,
  "storageSize": 552281787,
  "backlogSize": 307736567,
  "publishRateLimitedTimes": 0,
  "offloadedStorageSize": 0,
  "lastOffloadLedgerId": 0,
  "lastOffloadSuccessTimeStamp": 0,
  "lastOffloadFailureTimeStamp": 0,
  "publishers": [
    {
      "accessMode": "Shared",
      "msgRateIn": 333.33173299657216,
      "msgThroughputIn": 347088.33361734066,
      "averageMsgSize": 1041.27,
      "chunkedMessageRate": 0.0,
      "producerId": 0,
      "metadata": { },
      "address": "/192.168.214.214:63055",
      "connectedSince": "2024-03-26T10:29:04.869+08:00",
      "clientVersion": "2.8.1.27",
      "producerName": "pulsar-cluster-qa-391-23"
    }
  ],
  "waitingPublishers": 0,
  "subscriptions": {
    "sub0": {
      "msgRateOut": 0.0,
      "msgThroughputOut": 0.0,
      "bytesOutCounter": 373741143,
      "msgOutCounter": 359001,
      "msgRateRedeliver": 0.0,
      "chunkedMessageRate": 0,
      "msgBacklog": 18729,
      "backlogSize": 0,
      "msgBacklogNoDelayed": 18729,
      "blockedSubscriptionOnUnackedMsgs": false,
      "msgDelayed": 0,
      "unackedMessages": 502,
      "type": "Shared",
      "msgRateExpired": 0.0,
      "totalMsgExpired": 0,
      "lastExpireTimestamp": 1711421636224,
      "lastConsumedFlowTimestamp": 1711421261835,
      "lastConsumedTimestamp": 1711421261864,
      "lastAckedTimestamp": 1711421262238,
      "lastMarkDeleteAdvancedTimestamp": 1711421162509,
      "consumers": [
        {
          "msgRateOut": 0.0,
          "msgThroughputOut": 0.0,
          "bytesOutCounter": 131805765,
          "msgOutCounter": 126615,
          "msgRateRedeliver": 0.0,
          "chunkedMessageRate": 0.0,
          "consumerName": "0563b",
          "availablePermits": 385,
          "unackedMessages": 502,
          "avgMessagesPerEntry": 9,
          "blockedConsumerOnUnackedMsgs": true,
          "lastAckedTimestamp": 1711421262238,
          "lastConsumedTimestamp": 1711421261864,
          "metadata": { },
          "address": "/192.168.214.214:64708",
          "connectedSince": "2024-03-26T10:46:02.2+08:00",
          "clientVersion": "Pulsar-Java-v3.0.3"
        }
      ],
      "isDurable": true,
      "isReplicated": false,
      "consumersAfterMarkDeletePosition": { },
      "nonContiguousDeletedMessagesRanges": 46,
      "nonContiguousDeletedMessagesRangesSerializedSize": 1012,
      "durable": true,
      "replicated": false
    }
  },
  "replication": { },
  "deduplicationStatus": "Disabled",
  "nonContiguousDeletedMessagesRanges": 46,
  "nonContiguousDeletedMessagesRangesSerializedSize": 1012,
  "compaction": {
    "lastCompactionRemovedEventCount": 0,
    "lastCompactionSucceedTimestamp": 0,
    "lastCompactionFailedTimestamp": 0,
    "lastCompactionDurationTimeInMills": 0
  }
}

What did you expect to see?

consume normally

What did you see instead?

Symptoms

  • for partitioned topic, a few messages cannot be acked, but in consumer view, it seems acked already
    • consumer will block on unacked messages eventually
  • for non-partitioned topic, ConsumerImpl will redelivery these messages cause ConsumerImpl#unAckedMessageTracker mechanism still working

Anything else?

After investigation, I believe that the cause is that we haven't synchronize operations on Bitset in batch message, Consumer#acknowledge will be called from user threads, therefor operations on Bitset should be protected.

if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, true)) {
consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1);
consumer.getUnAckedMessageTracker().remove(msgId);
if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);
}
return individualAckFunction.apply(msgId);
} else if (batchIndexAckEnabled) {
return batchAckFunction.apply(batchMessageId);
} else {
return CompletableFuture.completedFuture(null);

int batchIndex = msgId.getBatchIndex();
if (individual) {
ackSet.clear(batchIndex);
} else {
ackSet.clear(0, batchIndex + 1);
}
return ackSet.isEmpty();

Are you willing to submit a PR?

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions