Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class RawBatchConverter {
public static boolean isReadableBatch(RawMessage msg) {
ByteBuf payload = msg.getHeadersAndPayload();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
return isReadableBatch(metadata);
}

public static boolean isReadableBatch(MessageMetadata metadata) {
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
}

Expand Down Expand Up @@ -71,9 +75,9 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!smm.isCompactedOut()) {
if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
}
singleMessagePayload.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,32 @@ private void phaseOneLoop(RawReader reader,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));

future.thenAcceptAsync(m -> {
try {
try (m) {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (RawBatchConverter.isReadableBatch(metadata)) {
try {
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
replaceMessage = old != null;
if (old != null) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
if (deleteCnt == numMessagesInBatch) {
deletedMessage = true;
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
Expand Down Expand Up @@ -174,8 +180,6 @@ private void phaseOneLoop(RawReader reader,
lastMessageId,
latestForKey, loopPromise);
}
} finally {
m.close();
}
}, scheduler).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
@Slf4j
public class CompactionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
Expand Down Expand Up @@ -549,6 +551,60 @@ public void testBatchMessageIdsDontChange() throws Exception {
}
}

@Test
public void testBatchMessageWithNullValue() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.receiverQueueSize(1).readCompacted(true).subscribe().close();

try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
.enableBatching(true)
.batchingMaxMessages(3)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create()
) {
// batch 1
producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
producer.newMessage().key("key1").value(null).sendAsync();
producer.newMessage().key("key2").value("my-message-3".getBytes()).send();

// batch 2
producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync();
producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync();
producer.newMessage().key("key3").value("my-message-6".getBytes()).send();

// batch 3
producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync();
producer.newMessage().key("key4").value(null).sendAsync();
producer.newMessage().key("key5").value("my-message-9".getBytes()).send();
}


// compact the topic
compact(topic);

// Read messages before compaction to get ids
List<Message<byte[]>> messages = new ArrayList<>();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
messages.add(message);
}
}

assertEquals(messages.size(), 3);
assertEquals(messages.get(0).getKey(), "key2");
assertEquals(messages.get(1).getKey(), "key3");
assertEquals(messages.get(2).getKey(), "key5");
}

@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Expand Down