Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,8 @@ private void verifyRGMetrics(int sentNumBytes, int sentNumMsgs,
Assert.assertNotEquals(ninthPercentileValue, 0);
}

// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 42;
// Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime.
private static final int PER_MESSAGE_METADATA_OHEAD = 31;

private static final int PUBLISH_INTERVAL_SECS = 10;
private static final int NUM_PRODUCERS = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -865,6 +866,37 @@ public void testOrderingOfKeyBasedBatchMessageContainer() throws PulsarClientExc
producer.close();
}

@Test(dataProvider = "containerBuilder")
public void testBatchSendOneMessage(BatcherBuilder builder) throws Exception {
final String topicName = "persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID();
final String subscriptionName = "sub-1";

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true)
.batcherBuilder(builder)
.create();
String msg = "my-message";
MessageId messageId = producer.newMessage().value(msg.getBytes()).property("key1", "value1").send();

Assert.assertTrue(messageId instanceof MessageIdImpl);
Assert.assertFalse(messageId instanceof BatchMessageIdImpl);

Message<byte[]> received = consumer.receive();
assertEquals(received.getSequenceId(), 0);
consumer.acknowledge(received);

Assert.assertEquals(new String(received.getData()), msg);
Assert.assertFalse(received.getProperties().isEmpty());
Assert.assertEquals(received.getProperties().get("key1"), "value1");
Assert.assertFalse(received.getMessageId() instanceof BatchMessageIdImpl);

producer.close();
consumer.close();
}

@Test(dataProvider = "containerBuilder")
public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws Exception {

Expand Down Expand Up @@ -1034,7 +1066,10 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
if (enableBatch) {
// only ack messages which batch index < 2, which means we will not to ack the
// whole batch for the batch that with more than 2 messages
if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
if ((message.getMessageId() instanceof BatchMessageIdImpl)
&& ((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
consumer.acknowledgeAsync(message).get();
} else if (!(message.getMessageId() instanceof BatchMessageIdImpl)){
consumer.acknowledgeAsync(message).get();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -36,6 +38,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.util.Sets;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -211,57 +214,75 @@ public void testBatchMessage() throws Exception {
final String topic = newTopicName();
final String subscription = "my-sub";
final long eventTime= 200;
final int msgNum = 2;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
// make sure 2 messages in one batch, because if only one message in batch,
// producer will not send batched messages
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.batchingMaxMessages(msgNum)
.batchingMaxBytes(Integer.MAX_VALUE)
.enableBatching(true)
.create();

long sendTime = System.currentTimeMillis();
// send message which is batch message and only contains one message, so do not set the deliverAtTime
MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
// send message which is batch message, so do not set the deliverAtTime
List<CompletableFuture<MessageId>> messageIdsFuture = new ArrayList<>(msgNum);
for (int i = 0; i < msgNum; ++i) {
CompletableFuture<MessageId> messageId = producer.newMessage()
.eventTime(eventTime)
.value(("hello").getBytes())
.send();
.value(("hello" + i).getBytes())
.sendAsync();
messageIdsFuture.add(messageId);
}
FutureUtil.waitForAll(messageIdsFuture);

// 1. test for peekMessages
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1);
Assert.assertEquals(messages.size(), 1);

MessageImpl message = (MessageImpl) messages.get(0);
Assert.assertEquals(message.getData(), ("hello").getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, msgNum);
Assert.assertEquals(messages.size(), msgNum);

MessageImpl message;
BrokerEntryMetadata entryMetadata;
for (int i = 0; i < msgNum; ++i) {
message = (MessageImpl) messages.get(i);
Assert.assertEquals(message.getData(), ("hello" + i).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
}

// getMessagesById and examineMessage only return the first messages in the batch
// 2. test for getMessagesById
MessageIdImpl messageId = (MessageIdImpl) messageIdsFuture.get(0).get();
message = (MessageImpl) admin.topics().getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId());
Assert.assertEquals(message.getData(), ("hello").getBytes());
// getMessagesById return the first message in the batch
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);

// 3. test for examineMessage
message = (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1);
Assert.assertEquals(message.getData(), ("hello").getBytes());
Assert.assertEquals(message.getData(), ("hello" + 0).getBytes());
Assert.assertTrue(message.getPublishTime() >= sendTime);
entryMetadata = message.getBrokerEntryMetadata();
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
Assert.assertEquals(entryMetadata.getIndex(), msgNum - 1);
System.out.println(message.getProperties());
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), msgNum);
// make sure BATCH_SIZE_HEADER > 0
Assert.assertTrue(Integer.parseInt(message.getProperty(BATCH_SIZE_HEADER)) > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -361,8 +362,10 @@ public void testKeyBasedBatchingOrder() throws Exception {
for (int i = 0; i < 5; i++) {
// Currently sending a duplicated message won't throw an exception. Instead, an invalid result is returned.
final MessageId messageId = producer.newMessage().value("msg").sequenceId(i).send();
assertTrue(messageId instanceof BatchMessageIdImpl);
final BatchMessageIdImpl messageIdImpl = (BatchMessageIdImpl) messageId;
// a duplicated message will send in a single batch, that will perform as a non-batched sending
assertTrue(messageId instanceof MessageIdImpl);
assertFalse(messageId instanceof BatchMessageIdImpl);
final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
assertEquals(messageIdImpl.getLedgerId(), -1L);
assertEquals(messageIdImpl.getEntryId(), -1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ public void testHasMessageAvailable(boolean enableBatch, boolean startInclusive)
}

@Test(timeOut = 20000)
public void testHasMessageAvailableWithBatch() throws Exception {
public void testHasMessageAvailable() throws Exception {
final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
final int numOfMessage = 10;

Expand All @@ -1092,11 +1092,11 @@ public void testHasMessageAvailableWithBatch() throws Exception {

//For batch-messages with single message, the type of client messageId should be the same as that of broker
MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes());
assertTrue(messageId instanceof MessageIdImpl);
assertFalse(messageId instanceof BatchMessageIdImpl);
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId();
assertTrue(messageId instanceof BatchMessageIdImpl);
assertFalse(lastMsgId instanceof BatchMessageIdImpl);
assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.cli;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.mockito.stubbing.Answer;
import org.testng.Assert;

/**
* An implement of {@link PulsarClientTool} for test, which will publish messages iff there is enough messages
* in the batch.
*/
public class PulsarClientToolForceBatchNum extends PulsarClientTool{
private final String topic;
private final int batchNum;

/**
*
* @param properties properties
* @param topic topic
* @param batchNum iff there is batchNum messages in the batch, the producer will flush and send.
*/
public PulsarClientToolForceBatchNum(Properties properties, String topic, int batchNum) {
super(properties);
this.topic = topic;
this.batchNum = batchNum;
}

@Override
protected void initJCommander() {
super.initJCommander();
produceCommand = new CmdProduce() {
@Override
public void updateConfig(ClientBuilder newBuilder, Authentication authentication, String serviceURL) {
try {
super.updateConfig(mockClientBuilder(newBuilder), authentication, serviceURL);
} catch (Exception e) {
Assert.fail("update config fail " + e.getMessage());
}
}
};
jcommander.addCommand("produce", produceCommand);
}

private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws Exception {
PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer()
.batchingMaxBytes(Integer.MAX_VALUE)
.batchingMaxMessages(batchNum)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.topic(topic);
Producer<byte[]> producer = producerBuilder.create();

PulsarClientImpl mockClient = spy(client);
ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
Producer<byte[]> mockProducer = spy(producer);
ClientBuilder mockClientBuilder = spy(newBuilder);

doAnswer((Answer<TypedMessageBuilder>) invocation -> {
TypedMessageBuilder typedMessageBuilder = spy((TypedMessageBuilder) invocation.callRealMethod());
doAnswer((Answer<MessageId>) invocation1 -> {
TypedMessageBuilder mock = ((TypedMessageBuilder) invocation1.getMock());
// using sendAsync() to replace send()
mock.sendAsync();
return null;
}).when(typedMessageBuilder).send();
return typedMessageBuilder;
}).when(mockProducer).newMessage();

doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
doReturn(mockClient).when(mockClientBuilder).build();
return mockClientBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,19 @@ public void testDisableBatching() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("disable-batching");
final int numberOfMessages = 5;
// `numberOfMessages` should be an even number, because we set `batchNum` as 2, make sure batch and non batch
// messages in the same batch
final int numberOfMessages = 6;
final int batchNum = 2;

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();

PulsarClientTool pulsarClientTool1 = new PulsarClientTool(properties);
PulsarClientTool pulsarClientTool1 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args1 = {"produce", "-m", "batched", "-n", Integer.toString(numberOfMessages), topicName};
Assert.assertEquals(pulsarClientTool1.run(args1), 0);

PulsarClientTool pulsarClientTool2 = new PulsarClientTool(properties);
PulsarClientTool pulsarClientTool2 = new PulsarClientToolForceBatchNum(properties, topicName, batchNum);
String[] args2 = {"produce", "-m", "non-batched", "-n", Integer.toString(numberOfMessages), "-db", topicName};
Assert.assertEquals(pulsarClientTool2.run(args2), 0);

Expand Down
Loading