Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,37 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "flaky")
public class ClientDeduplicationTest extends ProducerConsumerBase {

@DataProvider
public static Object[][] batchingTypes() {
return new Object[][] {
{ BatcherBuilder.DEFAULT },
{ BatcherBuilder.KEY_BASED }
};
}

@BeforeClass
@Override
protected void setup() throws Exception {
Expand All @@ -46,7 +64,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
@Test(priority = -1)
public void testNamespaceDeduplicationApi() throws Exception {
final String namespace = "my-property/my-ns";
assertNull(admin.namespaces().getDeduplicationStatus(namespace));
Expand Down Expand Up @@ -174,9 +192,10 @@ public void testProducerDeduplication() throws Exception {
producer.close();
}

@Test(timeOut = 30000)
public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Exception {
String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId";
@Test(timeOut = 30000, dataProvider = "batchingTypes")
public void testProducerDeduplicationWithDiscontinuousSequenceId(BatcherBuilder batcherBuilder) throws Exception {
String topic = "persistent://my-property/my-ns/testProducerDeduplicationWithDiscontinuousSequenceId-"
+ System.currentTimeMillis();
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);

// Set infinite timeout
Expand All @@ -185,7 +204,9 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except
.topic(topic)
.producerName("my-producer-name")
.enableBatching(true)
.batcherBuilder(batcherBuilder)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(1L, TimeUnit.HOURS)
.sendTimeout(0, TimeUnit.SECONDS);

Producer<byte[]> producer = producerBuilder.create();
Expand All @@ -208,7 +229,8 @@ public void testProducerDeduplicationWithDiscontinuousSequenceId() throws Except
producer.flush();

for (int i = 0; i < 4; i++) {
Message<byte[]> msg = consumer.receive();
Message<byte[]> msg = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(new String(msg.getData()), "my-message-" + i);
consumer.acknowledge(msg);
}
Expand Down Expand Up @@ -284,4 +306,68 @@ public void testProducerDeduplicationNonBatchAsync() throws Exception {

producer.close();
}

@Test(timeOut = 30000)
public void testKeyBasedBatchingOrder() throws Exception {
final String topic = "persistent://my-property/my-ns/test-key-based-batching-order";
admin.namespaces().setDeduplicationStatus("my-property/my-ns", true);

final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscribe();
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.batchingMaxMessages(100)
.batchingMaxBytes(1024 * 1024 * 5)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
// | key | sequence id list |
// | :-- | :--------------- |
// | A | 0, 3, 4 |
// | B | 1, 2 |
final List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
sendFutures.add(producer.newMessage().key("A").value("msg-0").sequenceId(0L).sendAsync());
sendFutures.add(producer.newMessage().key("B").value("msg-1").sequenceId(1L).sendAsync());
sendFutures.add(producer.newMessage().key("B").value("msg-2").sequenceId(2L).sendAsync());
sendFutures.add(producer.newMessage().key("A").value("msg-3").sequenceId(3L).sendAsync());
sendFutures.add(producer.newMessage().key("A").value("msg-4").sequenceId(4L).sendAsync());
// The message order is expected to be [1, 2, 0, 3, 4]. The sequence ids are not ordered strictly, but:
// 1. The sequence ids for a given key are ordered.
// 2. The highest sequence ids of batches are ordered.
producer.flush();

FutureUtil.waitForAll(sendFutures);
final List<MessageId> sendMessageIds = sendFutures.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
for (int i = 0; i < sendMessageIds.size(); i++) {
log.info("Send msg-{} to {}", i, sendMessageIds.get(i));
}

final List<Long> sequenceIdList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("Received {}, key: {}, seq id: {}, msg id: {}",
msg.getValue(), msg.getKey(), msg.getSequenceId(), msg.getMessageId());
assertNotNull(msg);
sequenceIdList.add(msg.getSequenceId());
}
assertEquals(sequenceIdList, Arrays.asList(1L, 2L, 0L, 3L, 4L));

for (int i = 0; i < 5; i++) {
// Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
assertTrue(messageId instanceof BatchMessageIdImpl);
final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
assertEquals(messageIdImpl.getLedgerId(), -1L);
assertEquals(messageIdImpl.getEntryId(), -1L);
}

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand All @@ -46,14 +49,26 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer {

private MessageMetadata messageMetadata = new MessageMetadata();
// sequence id for this batch which will be persisted as a single entry by broker
@Getter
@Setter
private long lowestSequenceId = -1L;
@Getter
@Setter
private long highestSequenceId = -1L;
private ByteBuf batchedMessageMetadataAndPayload;
private List<MessageImpl<?>> messages = new ArrayList<>();
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;

public BatchMessageContainerImpl() {
}

public BatchMessageContainerImpl(ProducerImpl<?> producer) {
this();
setProducer(producer);
}

@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {

Expand All @@ -79,10 +94,6 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
if (batchedMessageMetadataAndPayload != null) {
// if payload has been allocated release it
batchedMessageMetadataAndPayload.release();
}
discard(new PulsarClientException(e));
return false;
}
Expand All @@ -101,7 +112,6 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
}
highestSequenceId = msg.getSequenceId();
ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));

return isBatchFull();
}

Expand Down Expand Up @@ -169,6 +179,10 @@ public void discard(Exception ex) {
if (firstCallback != null) {
firstCallback.sendComplete(ex);
}
if (batchedMessageMetadataAndPayload != null) {
ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload = null;
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
lowestSequenceId, t);
Expand All @@ -190,6 +204,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setSequenceId(lowestSequenceId);
messageMetadata.setHighestSequenceId(highestSequenceId);
if (currentTxnidMostBits != -1) {
messageMetadata.setTxnidMostBits(currentTxnidMostBits);
Expand Down
Loading