diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 304bb6eaaa02d..52017444a2b76 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -20,19 +20,37 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "flaky") public class ClientDeduplicationTest extends ProducerConsumerBase { + @DataProvider + public static Object[][] batchingTypes() { + return new Object[][] { + { BatcherBuilder.DEFAULT }, + { BatcherBuilder.KEY_BASED } + }; + } + @BeforeClass @Override protected void setup() throws Exception { @@ -46,7 +64,7 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test + @Test(priority = -1) public void testNamespaceDeduplicationApi() throws Exception { final String namespace = "my-property/my-ns"; assertNull(admin.namespaces().getDeduplicationStatus(namespace)); @@ -174,9 +192,10 @@ public void testProducerDeduplication() throws Exception { producer.close(); } - @Test(timeOut = 30000) - public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception { - String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId"; + @Test(timeOut = 30000, dataProvider = "batchingTypes") + public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception { + String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-" + + System.currentTimeMillis(); admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); // Set infinite timeout @@ -185,7 +204,9 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except .topic(topic) .producerName("my-producer-name") .enableBatching(true) + .batcherBuilder(batcherBuilder) .batchingMaxMessages(10) + .batchingMaxPublishDelay(1L, TimeUnit.HOURS) .sendTimeout(0, TimeUnit.SECONDS); Producer producer = producerBuilder.create(); @@ -208,7 +229,8 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except producer.flush(); for (int i = 0; i < 4; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + assertNotNull(msg); assertEquals(new String(msg.getData()), "my-message-" + i); consumer.acknowledge(msg); } @@ -284,4 +306,68 @@ public void testProducerDeduplicationNonBatchAsync() throws Exception { producer.close(); } + + @Test(timeOut = 30000) + public void testKeyBasedBatchingOrder() throws Exception { + final String topic = "persistent://my-property/my-ns/test-key-based-batching-order"; + admin.namespaces().setDeduplicationStatus("my-property/my-ns", true); + + final Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscribe(); + final Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .batcherBuilder(BatcherBuilder.KEY_BASED) + .batchingMaxMessages(100) + .batchingMaxBytes(1024 * 1024 * 5) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create(); + // | key | sequence id list | + // | :-- | :--------------- | + // | A | 0, 3, 4 | + // | B | 1, 2 | + final List> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync()); + sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync()); + sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync()); + sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync()); + sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync()); + // The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but: + // 1. The sequence ids for a given key are ordered. + // 2. The highest sequence ids of batches are ordered. + producer.flush(); + + FutureUtil.waitForAll(sendFutures); + final List sendMessageIds = sendFutures.stream().map(CompletableFuture::join) + .collect(Collectors.toList()); + for (int i = 0; i < sendMessageIds.size(); i++) { + log.info("Send msg-{} to {}", i, sendMessageIds.get(i)); + } + + final List sequenceIdList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + final Message msg = consumer.receive(3, TimeUnit.SECONDS); + if (msg == null) { + break; + } + log.info("Received {}, key: {}, seq id: {}, msg id: {}", + msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId()); + assertNotNull(msg); + sequenceIdList.add(msg.getSequenceId()); + } + assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L)); + + for (int i = 0; i < 5; i++) { + // Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned. + final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send(); + assertTrue(messageId instanceof BatchMessageIdImpl); + final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId; + assertEquals(messageIdImpl.getLedgerId(), -1L); + assertEquals(messageIdImpl.getEntryId(), -1L); + } + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index afd3f808f8e55..02fb491d09dec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -19,10 +19,13 @@ package org.apache.pulsar.client.impl; import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import lombok.Getter; +import lombok.Setter; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -46,7 +49,11 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { private MessageMetadata messageMetadata = new MessageMetadata(); // sequence id for this batch which will be persisted as a single entry by broker + @Getter + @Setter private long lowestSequenceId = -1L; + @Getter + @Setter private long highestSequenceId = -1L; private ByteBuf batchedMessageMetadataAndPayload; private List> messages = new ArrayList<>(); @@ -54,6 +61,14 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { // keep track of callbacks for individual messages being published in a batch protected SendCallback firstCallback; + public BatchMessageContainerImpl() { + } + + public BatchMessageContainerImpl(ProducerImpl producer) { + this(); + setProducer(producer); + } + @Override public boolean add(MessageImpl msg, SendCallback callback) { @@ -79,10 +94,6 @@ public boolean add(MessageImpl msg, SendCallback callback) { } } catch (Throwable e) { log.error("construct first message failed, exception is ", e); - if (batchedMessageMetadataAndPayload != null) { - // if payload has been allocated release it - batchedMessageMetadataAndPayload.release(); - } discard(new PulsarClientException(e)); return false; } @@ -101,7 +112,6 @@ public boolean add(MessageImpl msg, SendCallback callback) { } highestSequenceId = msg.getSequenceId(); ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId())); - return isBatchFull(); } @@ -169,6 +179,10 @@ public void discard(Exception ex) { if (firstCallback != null) { firstCallback.sendComplete(ex); } + if (batchedMessageMetadataAndPayload != null) { + ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload); + batchedMessageMetadataAndPayload = null; + } } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, lowestSequenceId, t); @@ -190,6 +204,7 @@ public OpSendMsg createOpSendMsg() throws IOException { return null; } messageMetadata.setNumMessagesInBatch(numMessagesInBatch); + messageMetadata.setSequenceId(lowestSequenceId); messageMetadata.setHighestSequenceId(highestSequenceId); if (currentTxnidMostBits != -1) { messageMetadata.setTxnidMostBits(currentTxnidMostBits); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java index 7614728353f89..f0d4601d568a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java @@ -18,23 +18,12 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.collect.ComparisonChain; -import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; -import org.apache.pulsar.common.api.proto.CompressionType; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.compression.CompressionCodec; -import org.apache.pulsar.common.protocol.ByteBufPair; -import org.apache.pulsar.common.protocol.Commands; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +38,7 @@ */ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer { - private Map batches = new HashMap<>(); + private final Map batches = new HashMap<>(); @Override public boolean add(MessageImpl msg, SendCallback callback) { @@ -57,29 +46,16 @@ public boolean add(MessageImpl msg, SendCallback callback) { log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName, numMessagesInBatch); } - numMessagesInBatch++; - currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); String key = getKey(msg); - KeyedBatch part = batches.get(key); - if (part == null) { - part = new KeyedBatch(); - part.addMsg(msg, callback); - part.compressionType = compressionType; - part.compressor = compressor; - part.maxBatchSize = maxBatchSize; - part.topicName = topicName; - part.producerName = producerName; - batches.putIfAbsent(key, part); - - if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) { - currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits(); - } - if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) { - currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits(); - } - - } else { - part.addMsg(msg, callback); + final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key, + __ -> new BatchMessageContainerImpl(producer)); + batchMessageContainer.add(msg, callback); + // The `add` method fails iff the container is empty, i.e. the `msg` is the first message to add, while `msg` + // was failed to add. In this case, `clear` method will be called and the batch container is empty and there is + // no need to update the stats. + if (!batchMessageContainer.isEmpty()) { + numMessagesInBatch++; + currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); } return isBatchFull(); } @@ -88,7 +64,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { public void clear() { numMessagesInBatch = 0; currentBatchSizeBytes = 0; - batches = new HashMap<>(); + batches.clear(); currentTxnidMostBits = -1L; currentTxnidLeastBits = -1L; } @@ -100,13 +76,7 @@ public boolean isEmpty() { @Override public void discard(Exception ex) { - try { - // Need to protect ourselves from any exception being thrown in the future handler from the application - batches.forEach((k, v) -> v.firstCallback.sendComplete(ex)); - } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback", topicName, producerName, t); - } - batches.forEach((k, v) -> ReferenceCountUtil.safeRelease(v.batchedMessageMetadataAndPayload)); + batches.forEach((k, v) -> v.discard(ex)); clear(); } @@ -115,66 +85,45 @@ public boolean isMultiBatches() { return true; } - private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) throws IOException { - ByteBuf encryptedPayload = producer.encryptMessage(keyedBatch.messageMetadata, - keyedBatch.getCompressedBatchMetadataAndPayload()); - if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { - keyedBatch.discard(new PulsarClientException.InvalidMessageException( - "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes")); - return null; - } - - final int numMessagesInBatch = keyedBatch.messages.size(); - long currentBatchSizeBytes = 0; - for (MessageImpl message : keyedBatch.messages) { - currentBatchSizeBytes += message.getDataBuffer().readableBytes(); - } - keyedBatch.messageMetadata.setNumMessagesInBatch(numMessagesInBatch); - if (currentTxnidMostBits != -1) { - keyedBatch.messageMetadata.setTxnidMostBits(currentTxnidMostBits); - } - if (currentTxnidLeastBits != -1) { - keyedBatch.messageMetadata.setTxnidLeastBits(currentTxnidLeastBits); - } - ByteBufPair cmd = producer.sendMessage(producer.producerId, keyedBatch.sequenceId, numMessagesInBatch, - keyedBatch.messageMetadata, encryptedPayload); - - ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create( - keyedBatch.messages, cmd, keyedBatch.sequenceId, keyedBatch.firstCallback); - - op.setNumMessagesInBatch(numMessagesInBatch); - op.setBatchSizeByte(currentBatchSizeBytes); - return op; - } - @Override public List createOpSendMsgs() throws IOException { - List result = new ArrayList<>(); - List list = new ArrayList<>(batches.values()); - list.sort(((o1, o2) -> ComparisonChain.start() - .compare(o1.sequenceId, o2.sequenceId) - .result())); - for (KeyedBatch keyedBatch : list) { - ProducerImpl.OpSendMsg op = createOpSendMsg(keyedBatch); - if (op != null) { - result.add(op); + try { + // In key based batching, the sequence ids might not be ordered, for example, + // | key | sequence id list | + // | :-- | :--------------- | + // | A | 0, 3, 4 | + // | B | 1, 2 | + // The message order should be 1, 2, 0, 3, 4 so that a message with a sequence id <= 4 should be dropped. + // However, for a MessageMetadata with both `sequence_id` and `highest_sequence_id` fields, the broker will + // expect a strict order so that the batch of key "A" (0, 3, 4) will be dropped. + // Therefore, we should update the `sequence_id` field to the highest sequence id and remove the + // `highest_sequence_id` field to allow the weak order. + batches.values().forEach(batchMessageContainer -> { + batchMessageContainer.setLowestSequenceId(batchMessageContainer.getHighestSequenceId()); + }); + return batches.values().stream().sorted((o1, o2) -> + (int) (o1.getLowestSequenceId() - o2.getLowestSequenceId()) + ).map(batchMessageContainer -> { + try { + return batchMessageContainer.createOpSendMsg(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }).collect(Collectors.toList()); + } catch (IllegalStateException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } else { + throw e; } } - return result; } @Override public boolean hasSameSchema(MessageImpl msg) { String key = getKey(msg); - KeyedBatch part = batches.get(key); - if (part == null || part.messages.isEmpty()) { - return true; - } - if (!part.messageMetadata.hasSchemaVersion()) { - return msg.getSchemaVersion() == null; - } - return Arrays.equals(msg.getSchemaVersion(), - part.messageMetadata.getSchemaVersion()); + BatchMessageContainerImpl batchMessageContainer = batches.get(key); + return batchMessageContainer == null || batchMessageContainer.hasSameSchema(msg); } private String getKey(MessageImpl msg) { @@ -184,78 +133,6 @@ private String getKey(MessageImpl msg) { return msg.getKey(); } - private static class KeyedBatch { - private final MessageMetadata messageMetadata = new MessageMetadata(); - // sequence id for this batch which will be persisted as a single entry by broker - private long sequenceId = -1; - private ByteBuf batchedMessageMetadataAndPayload; - private List> messages = new ArrayList<>(); - private SendCallback previousCallback = null; - private CompressionType compressionType; - private CompressionCodec compressor; - private int maxBatchSize; - private String topicName; - private String producerName; - - // keep track of callbacks for individual messages being published in a batch - private SendCallback firstCallback; - - private ByteBuf getCompressedBatchMetadataAndPayload() { - for (MessageImpl msg : messages) { - batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload( - msg.getMessageBuilder(), msg.getDataBuffer(), batchedMessageMetadataAndPayload); - } - int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); - batchedMessageMetadataAndPayload.release(); - if (compressionType != CompressionType.NONE) { - messageMetadata.setCompression(compressionType); - messageMetadata.setUncompressedSize(uncompressedSize); - } - - // Update the current max batch size using the uncompressed size, which is what we need in any case to - // accumulate the batch content - maxBatchSize = Math.max(maxBatchSize, uncompressedSize); - return compressedPayload; - } - - private void addMsg(MessageImpl msg, SendCallback callback) { - if (messages.size() == 0) { - sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); - batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT - .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize())); - firstCallback = callback; - } - if (previousCallback != null) { - previousCallback.addCallback(msg, callback); - } - previousCallback = callback; - messages.add(msg); - } - - public void discard(Exception ex) { - try { - // Need to protect ourselves from any exception being thrown in the future handler from the application - if (firstCallback != null) { - firstCallback.sendComplete(ex); - } - } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName, - sequenceId, t); - } - clear(); - } - - public void clear() { - messages = new ArrayList<>(); - firstCallback = null; - previousCallback = null; - messageMetadata.clear(); - sequenceId = -1; - batchedMessageMetadataAndPayload = null; - } - } - private static final Logger log = LoggerFactory.getLogger(BatchMessageKeyBasedContainer.class); }