From 982ea19d985621cf77913e5cc3b8c62a6696cbb1 Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 11 Aug 2023 20:37:34 +0800 Subject: [PATCH 1/4] Fix data lost during topic compact --- .../pulsar/compaction/TwoPhaseCompactor.java | 10 ++-- .../pulsar/compaction/CompactionTest.java | 56 +++++++++++++++++++ 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 7d3b5863cb6f6..2d576a8c70216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -122,7 +122,7 @@ private void phaseOneLoop(RawReader reader, () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); future.thenAcceptAsync(m -> { - try { + try (m) { MessageId id = m.getMessageId(); boolean deletedMessage = false; boolean replaceMessage = false; @@ -162,20 +162,18 @@ private void phaseOneLoop(RawReader reader, mxBean.addCompactionRemovedEvent(reader.getTopic()); } } - MessageId first = firstMessageId.orElse(deletedMessage ? null : id); + MessageId first = firstMessageId.orElse(id); MessageId to = deletedMessage ? toMessageId.orElse(null) : id; if (id.compareTo(lastMessageId) == 0) { - loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to, + loopPromise.complete(new PhaseOneResult(first, to == null ? id : to, lastMessageId, latestForKey)); } else { phaseOneLoop(reader, - Optional.ofNullable(first), + Optional.of(first), Optional.ofNullable(to), lastMessageId, latestForKey, loopPromise); } - } finally { - m.close(); } }, scheduler).exceptionally(ex -> { loopPromise.completeExceptionally(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index d11a2f87192ff..6f7c1303045cb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -92,6 +93,7 @@ import org.testng.annotations.Test; @Test(groups = "broker-impl") +@Slf4j public class CompactionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; @@ -549,6 +551,60 @@ public void testBatchMessageIdsDontChange() throws Exception { } } + @Test + public void testBatchMessageWithNullValue() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .receiverQueueSize(1).readCompacted(true).subscribe().close(); + + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create() + ) { + // batch 1 + producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync(); + producer.newMessage().key("key1").value(null).sendAsync(); + producer.newMessage().key("key2").value("my-message-3".getBytes()).send(); + + // batch 2 + producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync(); + producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync(); + producer.newMessage().key("key3").value("my-message-6".getBytes()).send(); + + // batch 3 + producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync(); + producer.newMessage().key("key4").value(null).sendAsync(); + producer.newMessage().key("key5").value("my-message-9".getBytes()).send(); + } + + + // compact the topic + compact(topic); + + // Read messages before compaction to get ids + List> messages = new ArrayList<>(); + try (Consumer consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) { + while (true) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + messages.add(message); + } + } + + assertEquals(messages.size(), 3); + assertEquals(messages.get(0).getKey(), "key2"); + assertEquals(messages.get(1).getKey(), "key3"); + assertEquals(messages.get(2).getKey(), "key5"); + } + @Test public void testWholeBatchCompactedOut() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; From 696a6e524828d400fd2dbbaea8c82c30c4fef942 Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 12 Aug 2023 15:37:19 +0800 Subject: [PATCH 2/4] Recompute deletedMessage --- .../pulsar/client/impl/RawBatchConverter.java | 4 ++++ .../pulsar/compaction/TwoPhaseCompactor.java | 23 +++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 54d2ff867a629..f2d8cb1bae702 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -44,6 +44,10 @@ public class RawBatchConverter { public static boolean isReadableBatch(RawMessage msg) { ByteBuf payload = msg.getHeadersAndPayload(); MessageMetadata metadata = Commands.parseMessageMetadata(payload); + return isReadableBatch(metadata); + } + + public static boolean isReadableBatch(MessageMetadata metadata) { return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 2d576a8c70216..7bb13073a14fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -127,22 +127,31 @@ private void phaseOneLoop(RawReader reader, boolean deletedMessage = false; boolean replaceMessage = false; mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); - if (RawBatchConverter.isReadableBatch(m)) { + MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); + if (RawBatchConverter.isReadableBatch(metadata)) { try { + int numMessagesInBatch = metadata.getNumMessagesInBatch(); + int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch(m)) { + boolean singleDeletedMessage = false; + boolean singleReplaceMessage = false; if (e != null) { if (e.getRight() > 0) { MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); - replaceMessage = old != null; + singleReplaceMessage = old != null; } else { - deletedMessage = true; + singleDeletedMessage = true; latestForKey.remove(e.getMiddle()); + deleteCnt++; } } - if (replaceMessage || deletedMessage) { + if (singleDeletedMessage || singleReplaceMessage) { mxBean.addCompactionRemovedEvent(reader.getTopic()); } } + if (deleteCnt == numMessagesInBatch) { + deletedMessage = true; + } } catch (IOException ioe) { log.info("Error decoding batch for message {}. Whole batch will be included in output", id, ioe); @@ -162,14 +171,14 @@ private void phaseOneLoop(RawReader reader, mxBean.addCompactionRemovedEvent(reader.getTopic()); } } - MessageId first = firstMessageId.orElse(id); + MessageId first = firstMessageId.orElse(deletedMessage ? null : id); MessageId to = deletedMessage ? toMessageId.orElse(null) : id; if (id.compareTo(lastMessageId) == 0) { - loopPromise.complete(new PhaseOneResult(first, to == null ? id : to, + loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to, lastMessageId, latestForKey)); } else { phaseOneLoop(reader, - Optional.of(first), + Optional.ofNullable(first), Optional.ofNullable(to), lastMessageId, latestForKey, loopPromise); From b55e6b90b2dccfcadb6e1215ad2a383de34e9733 Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 12 Aug 2023 16:54:29 +0800 Subject: [PATCH 3/4] Don't extract message with null key in phase one --- .../java/org/apache/pulsar/client/impl/RawBatchConverter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index f2d8cb1bae702..5f476df68af4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -75,9 +75,9 @@ public static List> extractIdsAndKey msg.getMessageIdData().getEntryId(), msg.getMessageIdData().getPartition(), i); - if (!smm.isCompactedOut()) { + if (!smm.isCompactedOut() && smm.hasPartitionKey()) { idsAndKeysAndSize.add(ImmutableTriple.of(id, - smm.hasPartitionKey() ? smm.getPartitionKey() : null, + smm.getPartitionKey(), smm.hasPayloadSize() ? smm.getPayloadSize() : 0)); } singleMessagePayload.release(); From df8b903a2c25a17434f3092d221446c06f4500b0 Mon Sep 17 00:00:00 2001 From: coderzc Date: Mon, 14 Aug 2023 17:59:07 +0800 Subject: [PATCH 4/4] Improve code --- .../apache/pulsar/compaction/TwoPhaseCompactor.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 7bb13073a14fb..29225253da197 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -133,21 +133,18 @@ private void phaseOneLoop(RawReader reader, int numMessagesInBatch = metadata.getNumMessagesInBatch(); int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch(m)) { - boolean singleDeletedMessage = false; - boolean singleReplaceMessage = false; if (e != null) { if (e.getRight() > 0) { MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); - singleReplaceMessage = old != null; + if (old != null) { + mxBean.addCompactionRemovedEvent(reader.getTopic()); + } } else { - singleDeletedMessage = true; latestForKey.remove(e.getMiddle()); deleteCnt++; + mxBean.addCompactionRemovedEvent(reader.getTopic()); } } - if (singleDeletedMessage || singleReplaceMessage) { - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } } if (deleteCnt == numMessagesInBatch) { deletedMessage = true;