From 855bb248063ad7fcfbbd7c04d9795de7dee9127a Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 3 Feb 2026 14:38:04 +0800 Subject: [PATCH 1/3] replace ConcurrentBitSetRecyclable with ConcurrentBitSet --- .../pulsar/client/impl/ConsumerImpl.java | 11 ++++---- ...sistentAcknowledgmentsGroupingTracker.java | 25 ++++++++++--------- .../pulsar/common/protocol/Commands.java | 11 ++++---- .../ConcurrentBitSetRecyclable.java | 1 + 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 37e9f16fe0292..adecd97564ff7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -136,7 +136,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3187,7 +3187,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me } else { if (Commands.peerSupportsMultiMessageAcknowledgment( getClientCnx().getRemoteEndpointProtocolVersion())) { - List> entriesToAck = + List> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { @@ -3225,7 +3225,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me } private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List> entries, + List> entries, long requestID) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() @@ -3244,7 +3244,7 @@ protected BaseCommand initialValue() throws Exception { } }; - private static BaseCommand newMultiMessageAckCommon(List> entries) { + private static BaseCommand newMultiMessageAckCommon(List> entries) { BaseCommand cmd = LOCAL_BASE_COMMAND.get() .clear() .setType(BaseCommand.Type.ACK); @@ -3253,7 +3253,7 @@ private static BaseCommand newMultiMessageAckCommon(List pendingIndividualAcks; @VisibleForTesting - final ConcurrentSkipListMap pendingIndividualBatchIndexAcks; + final ConcurrentSkipListMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; private final boolean batchIndexAckEnabled; @@ -133,7 +133,7 @@ public boolean isDuplicate(MessageId messageId) { return true; } if (messageIdAdv.getBatchIndex() >= 0) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key); + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.get(key); return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex()); } return false; @@ -327,21 +327,22 @@ private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map doIndividualBatchAckAsync(MessageIdAdv msgId) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( + ConcurrentBitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { final BitSet ackSet = msgId.getAckSet(); - final ConcurrentBitSetRecyclable value; + final ConcurrentBitSet value; if (ackSet != null) { synchronized (ackSet) { if (!ackSet.isEmpty()) { - value = ConcurrentBitSetRecyclable.create(ackSet); + value = new ConcurrentBitSet(); + value.or(ackSet); } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } } } else { - value = ConcurrentBitSetRecyclable.create(); + value = new ConcurrentBitSet(); value.set(0, msgId.getBatchSize()); } return value; @@ -445,7 +446,7 @@ private void flushAsync(ClientCnx cnx) { } // Flush all individual acks - List> entriesToAck = + List> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { @@ -487,7 +488,7 @@ private void flushAsync(ClientCnx cnx) { } while (true) { - Map.Entry entry = + Map.Entry entry = pendingIndividualBatchIndexAcks.pollFirstEntry(); if (entry == null) { // The entry has been removed in a different thread @@ -539,7 +540,7 @@ private CompletableFuture newImmediateAckAndFlush(long consumerId, Message // cumulative ack chunk by the last messageId if (chunkMsgIds != null && ackType != AckType.Cumulative) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { - List> entriesToAck = new ArrayList<>(chunkMsgIds.length); + List> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -568,7 +569,7 @@ private CompletableFuture newMessageAckCommandAndWrite( long entryId, BitSetRecyclable ackSet, AckType ackType, Map properties, boolean flush, TimedCompletableFuture timedCompletableFuture, - List> entriesToAck) { + List> entriesToAck) { if (consumer.isAckReceiptEnabled()) { final long requestId = consumer.getClient().newRequestId(); final ByteBuf cmd; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index a6796387f9147..deca71e9361ea 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -112,7 +112,7 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; +import org.apache.pulsar.common.util.collections.ConcurrentBitSet; @UtilityClass @Slf4j @@ -1035,7 +1035,7 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg, } public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List> entries) { + List> entries) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() .setConsumerId(consumerId) @@ -1045,14 +1045,14 @@ public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID return serializeWithSize(cmd); } - private static BaseCommand newMultiMessageAckCommon(List> entries) { + private static BaseCommand newMultiMessageAckCommon(List> entries) { BaseCommand cmd = localCmd(Type.ACK); CommandAck ack = cmd.setAck(); int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + ConcurrentBitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -1061,7 +1061,6 @@ private static BaseCommand newMultiMessageAckCommon(List> entries, + List> entries, long requestId) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java index 0ba409b2d7d17..d29e4b8240fde 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java @@ -27,6 +27,7 @@ /** * Safe multithreaded version of {@code BitSet} and leverage netty recycler. */ +@Deprecated @EqualsAndHashCode(callSuper = true) public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { From 44af9400f1f0a1f20bbd457d5adbe03550f77c6f Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 3 Feb 2026 16:48:37 +0800 Subject: [PATCH 2/3] add test --- .../AcknowledgementsGroupingTrackerTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 7a8222473a30a..a8638613d922d 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -405,6 +405,45 @@ public void testDoIndividualBatchAckAsync() { tracker.close(); } + @Test + public void testAddAcknowledgmentNeverAffectIsDuplicate() throws Exception { + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setMaxAcknowledgmentGroupSize(5); + PersistentAcknowledgmentsGroupingTracker tracker = + new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); + + BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null); + BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(5, 1, 0, 2, 10, null); + BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(5, 1, 0, 4, 10, null); + BatchMessageIdImpl batchMessageId6 = new BatchMessageIdImpl(5, 1, 0, 6, 10, null); + BatchMessageIdImpl batchMessageId8 = new BatchMessageIdImpl(5, 1, 0, 8, 10, null); + Thread addAcknowledgmentThread = new Thread(() -> { + tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap()); + tracker.addAcknowledgment(batchMessageId2, AckType.Individual, Collections.emptyMap()); + tracker.addAcknowledgment(batchMessageId4, AckType.Individual, Collections.emptyMap()); + tracker.addAcknowledgment(batchMessageId6, AckType.Individual, Collections.emptyMap()); + tracker.addAcknowledgment(batchMessageId8, AckType.Individual, Collections.emptyMap()); + }, ""); + addAcknowledgmentThread.start(); + + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null); + BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null); + BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(5, 1, 0, 5, 10, null); + BatchMessageIdImpl batchMessageId7 = new BatchMessageIdImpl(5, 1, 0, 7, 10, null); + BatchMessageIdImpl batchMessageId9 = new BatchMessageIdImpl(5, 1, 0, 9, 10, null); + Thread idDuplicateThread = new Thread(() -> { + assertFalse(tracker.isDuplicate(batchMessageId1)); + assertFalse(tracker.isDuplicate(batchMessageId3)); + assertFalse(tracker.isDuplicate(batchMessageId5)); + assertFalse(tracker.isDuplicate(batchMessageId7)); + assertFalse(tracker.isDuplicate(batchMessageId9)); + }, ""); + idDuplicateThread.start(); + + addAcknowledgmentThread.join(); + idDuplicateThread.join(); + } + public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { From ec59767e7f600a52586e070a59622788744993bc Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 3 Feb 2026 23:41:55 +0800 Subject: [PATCH 3/3] fix test --- .../AcknowledgementsGroupingTrackerTest.java | 71 +++++++++++-------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index a8638613d922d..1f46f9da3a408 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -31,8 +31,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -406,42 +408,51 @@ public void testDoIndividualBatchAckAsync() { } @Test - public void testAddAcknowledgmentNeverAffectIsDuplicate() throws Exception { + public void testDoIndividualBatchAckNeverAffectIsDuplicate() throws Exception { ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); - conf.setMaxAcknowledgmentGroupSize(5); + conf.setMaxAcknowledgmentGroupSize(1); PersistentAcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup); BatchMessageIdImpl batchMessageId0 = new BatchMessageIdImpl(5, 1, 0, 0, 10, null); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(5, 1, 0, 2, 10, null); - BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(5, 1, 0, 4, 10, null); - BatchMessageIdImpl batchMessageId6 = new BatchMessageIdImpl(5, 1, 0, 6, 10, null); - BatchMessageIdImpl batchMessageId8 = new BatchMessageIdImpl(5, 1, 0, 8, 10, null); - Thread addAcknowledgmentThread = new Thread(() -> { - tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap()); - tracker.addAcknowledgment(batchMessageId2, AckType.Individual, Collections.emptyMap()); - tracker.addAcknowledgment(batchMessageId4, AckType.Individual, Collections.emptyMap()); - tracker.addAcknowledgment(batchMessageId6, AckType.Individual, Collections.emptyMap()); - tracker.addAcknowledgment(batchMessageId8, AckType.Individual, Collections.emptyMap()); - }, ""); - addAcknowledgmentThread.start(); - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(5, 1, 0, 1, 10, null); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(5, 1, 0, 3, 10, null); - BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(5, 1, 0, 5, 10, null); - BatchMessageIdImpl batchMessageId7 = new BatchMessageIdImpl(5, 1, 0, 7, 10, null); - BatchMessageIdImpl batchMessageId9 = new BatchMessageIdImpl(5, 1, 0, 9, 10, null); - Thread idDuplicateThread = new Thread(() -> { - assertFalse(tracker.isDuplicate(batchMessageId1)); - assertFalse(tracker.isDuplicate(batchMessageId3)); - assertFalse(tracker.isDuplicate(batchMessageId5)); - assertFalse(tracker.isDuplicate(batchMessageId7)); - assertFalse(tracker.isDuplicate(batchMessageId9)); - }, ""); - idDuplicateThread.start(); - - addAcknowledgmentThread.join(); - idDuplicateThread.join(); + + int loops = 10000; + int addAcknowledgmentThreadCount = 10; + List addAcknowledgmentThreads = new ArrayList<>(addAcknowledgmentThreadCount); + for (int i = 0; i < addAcknowledgmentThreadCount; i++) { + Thread addAcknowledgmentThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + tracker.addAcknowledgment(batchMessageId0, AckType.Individual, Collections.emptyMap()); + } + }, "doIndividualBatchAck-thread-" + i); + addAcknowledgmentThread.start(); + addAcknowledgmentThreads.add(addAcknowledgmentThread); + } + + int isDuplicateThreadCount = 10; + AtomicBoolean assertResult = new AtomicBoolean(); + List isDuplicateThreads = new ArrayList<>(isDuplicateThreadCount); + for (int i = 0; i < isDuplicateThreadCount; i++) { + Thread isDuplicateThread = new Thread(() -> { + for (int j = 0; j < loops; j++) { + boolean duplicate = tracker.isDuplicate(batchMessageId1); + assertResult.set(assertResult.get() || duplicate); + } + }, "isDuplicate-thread-" + i); + isDuplicateThread.start(); + isDuplicateThreads.add(isDuplicateThread); + } + + for (Thread addAcknowledgmentThread : addAcknowledgmentThreads) { + addAcknowledgmentThread.join(); + } + + for (Thread isDuplicateThread : isDuplicateThreads) { + isDuplicateThread.join(); + } + + assertFalse(assertResult.get()); } public class ClientCnxTest extends ClientCnx {