diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java index f5ae5f6fca216..e8b72e4701e5f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java @@ -350,6 +350,7 @@ public void testSimpleBatchProducerConsumer1kMessages() throws Exception { consumer.close(); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setMaxPendingMessages(numMsgs + 1); producerConf.setBatchingMaxPublishDelay(30000, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(numMsgsInBatch); producerConf.setBatchingEnabled(true); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java index 371eefa185e6c..c11a1ff25d5f4 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java @@ -44,6 +44,7 @@ import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.Producer; +import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.util.FutureUtil; @@ -261,7 +262,9 @@ public void testConsumersWithDifferentPermits() throws Exception { Consumer consumer2 = pulsarClient.subscribe(topicName, subName, conf2); List> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer producer = pulsarClient.createProducer(topicName); + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setMaxPendingMessages(numMsgs + 1); + Producer producer = pulsarClient.createProducer(topicName, conf); for (int i = 0; i < numMsgs; i++) { String message = "msg-" + i; futures.add(producer.sendAsync(message.getBytes()));