From f172471ddfc56dbdafecebccc3a31f908e4980c7 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Fri, 19 May 2017 10:19:32 -0700 Subject: [PATCH 1/9] Changed ProducerConfiguration blockIfQueueFull to false by default --- .../java/com/yahoo/pulsar/client/api/ProducerConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java index ab905ac1dfc7c..ec7750eabd82b 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java @@ -37,7 +37,7 @@ public class ProducerConfiguration implements Serializable { private static final long serialVersionUID = 1L; private long sendTimeoutMs = 30000; private int maxPendingMessages = 1000; - private boolean blockIfQueueFull = true; + private boolean blockIfQueueFull = false; private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition; private MessageRouter customMessageRouter = null; private long batchingMaxPublishDelayMs = 10; From ffd2f93c004d633deb244c0a21f7311cba105d71 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Mon, 22 May 2017 00:50:02 -0700 Subject: [PATCH 2/9] Fixed test cases --- .../com/yahoo/pulsar/broker/service/BatchMessageTest.java | 1 + .../yahoo/pulsar/broker/service/PersistentQueueE2ETest.java | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java index f5ae5f6fca216..e8b72e4701e5f 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BatchMessageTest.java @@ -350,6 +350,7 @@ public void testSimpleBatchProducerConsumer1kMessages() throws Exception { consumer.close(); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setMaxPendingMessages(numMsgs + 1); producerConf.setBatchingMaxPublishDelay(30000, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(numMsgsInBatch); producerConf.setBatchingEnabled(true); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java index 371eefa185e6c..c11a1ff25d5f4 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentQueueE2ETest.java @@ -44,6 +44,7 @@ import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.Producer; +import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.util.FutureUtil; @@ -261,7 +262,9 @@ public void testConsumersWithDifferentPermits() throws Exception { Consumer consumer2 = pulsarClient.subscribe(topicName, subName, conf2); List> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer producer = pulsarClient.createProducer(topicName); + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setMaxPendingMessages(numMsgs + 1); + Producer producer = pulsarClient.createProducer(topicName, conf); for (int i = 0; i < numMsgs; i++) { String message = "msg-" + i; futures.add(producer.sendAsync(message.getBytes())); From 36bcc6e14dae55ce76ca811ac48d0936fd9b7dd4 Mon Sep 17 00:00:00 2001 From: jai1 Date: Tue, 30 May 2017 20:03:52 +0000 Subject: [PATCH 3/9] Increased Queue size to 30K --- .../java/com/yahoo/pulsar/client/api/ProducerConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java index ec7750eabd82b..17c2f45be5bbb 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java @@ -36,7 +36,7 @@ public class ProducerConfiguration implements Serializable { */ private static final long serialVersionUID = 1L; private long sendTimeoutMs = 30000; - private int maxPendingMessages = 1000; + private int maxPendingMessages = 30000; private boolean blockIfQueueFull = false; private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition; private MessageRouter customMessageRouter = null; From 17a8cd2efbd1f1e439c0d67cd3d6bdb726f9f901 Mon Sep 17 00:00:00 2001 From: jai1 Date: Tue, 30 May 2017 20:59:36 +0000 Subject: [PATCH 4/9] Made changes in C++ Client --- pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index 7c0b76dfb1dd3..37c2056d0928f 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -36,9 +36,9 @@ struct ProducerConfigurationImpl { ProducerConfigurationImpl() : sendTimeoutMs(30000), compressionType(CompressionNone), - maxPendingMessages(1000), + maxPendingMessages(30000), routingMode(ProducerConfiguration::UseSinglePartition), - blockIfQueueFull(true), + blockIfQueueFull(false), batchingEnabled(false), batchingMaxMessages(1000), batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB From 7d6193208a26c1f35f3d622a3412b11c0db69471 Mon Sep 17 00:00:00 2001 From: jai1 Date: Wed, 31 May 2017 17:00:19 +0000 Subject: [PATCH 5/9] Fixed Performance Producers to block when queue is full --- pulsar-client-cpp/perf/PerfProducer.cc | 3 +++ .../java/com/yahoo/pulsar/testclient/PerformanceProducer.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc index bd65f4f185d8d..96fdd3bbfa00c 100644 --- a/pulsar-client-cpp/perf/PerfProducer.cc +++ b/pulsar-client-cpp/perf/PerfProducer.cc @@ -286,6 +286,9 @@ int main(int argc, char** argv) { producerConf.setBatchingMaxAllowedSizeInBytes(args.batchingMaxAllowedSizeInBytes); producerConf.setBatchingMaxPublishDelayMs(args.batchingMaxPublishDelayMs); } + + // Block if queue is full else we will start seeing errors in sendAsync + producerConf.setBlockIfQueueFull(true); pulsar::ClientConfiguration conf; conf.setUseTls(args.isUseTls); conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection); diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceProducer.java index 3679ff2e0e981..2540b171d0a27 100644 --- a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceProducer.java @@ -233,6 +233,9 @@ public static void main(String[] args) throws Exception { producerConf.setMaxPendingMessages(arguments.msgRate); } + // Block if queue is full else we will start seeing errors in sendAsync + producerConf.setBlockIfQueueFull(true); + for (int i = 0; i < arguments.numTopics; i++) { String topic = (arguments.numTopics == 1) ? prefixTopicName : String.format("%s-%d", prefixTopicName, i); log.info("Adding {} publishers on destination {}", arguments.numProducers, topic); From b23bf760f9af2fe1aad23e04a0bc0b6d531b4317 Mon Sep 17 00:00:00 2001 From: jai1 Date: Thu, 1 Jun 2017 17:14:21 +0000 Subject: [PATCH 6/9] Added default behavior to documentation --- .../src/main/java/com/yahoo/pulsar/client/api/Producer.java | 6 ++---- .../com/yahoo/pulsar/client/api/ProducerConfiguration.java | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Producer.java index a3fa7b82fe622..c3aa004107bd4 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Producer.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Producer.java @@ -50,8 +50,7 @@ public interface Producer extends Closeable { /** * Send a message asynchronously *

- * When the producer queue is full, by default this method will block until there will be space available in - * the queue. + * When the producer queue is full, by default this method will throw an exception {@link PulsarClientException#ProducerQueueIsFullError} *

* See {@link ProducerConfiguration#setMaxPendingMessages} to configure the producer queue size and * {@link ProducerConfiguration#setBlockIfQueueFull(boolean)} to change the blocking behavior. @@ -90,8 +89,7 @@ public interface Producer extends Closeable { * }); * *

- * When the producer queue is full, by default this method will block until there will be space available in - * the queue. + * When the producer queue is full, by default this method will throw an exception {@link PulsarClientException#ProducerQueueIsFullError} *

* See {@link ProducerConfiguration#setMaxPendingMessages} to configure the producer queue size and * {@link ProducerConfiguration#setBlockIfQueueFull(boolean)} to change the blocking behavior. diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java index 17c2f45be5bbb..8d16fb43741b2 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java @@ -83,8 +83,8 @@ public int getMaxPendingMessages() { /** * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. *

- * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} and send - * will block the calling thread. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} + * will fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. * * @param maxPendingMessages * @return @@ -108,7 +108,7 @@ public boolean getBlockIfQueueFull() { * Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing * message queue is full. *

- * Default is true. If set to false, send operations will immediately fail with + * Default is false. If set to false, send operations will immediately fail with * {@link ProducerQueueIsFullError} when there is no space left in pending queue. * * @param blockIfQueueFull From c763b3cabf3d75e499f501c045f11dc3546d4207 Mon Sep 17 00:00:00 2001 From: jai1 Date: Thu, 1 Jun 2017 17:17:28 +0000 Subject: [PATCH 7/9] Moving pending queue size changes to a different commit --- pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 2 +- .../java/com/yahoo/pulsar/client/api/ProducerConfiguration.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index 37c2056d0928f..bc8a55a060f7d 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -36,7 +36,7 @@ struct ProducerConfigurationImpl { ProducerConfigurationImpl() : sendTimeoutMs(30000), compressionType(CompressionNone), - maxPendingMessages(30000), + maxPendingMessages(1000), routingMode(ProducerConfiguration::UseSinglePartition), blockIfQueueFull(false), batchingEnabled(false), diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java index 8d16fb43741b2..c15dbed09d576 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ProducerConfiguration.java @@ -36,7 +36,7 @@ public class ProducerConfiguration implements Serializable { */ private static final long serialVersionUID = 1L; private long sendTimeoutMs = 30000; - private int maxPendingMessages = 30000; + private int maxPendingMessages = 1000; private boolean blockIfQueueFull = false; private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition; private MessageRouter customMessageRouter = null; From 4dd7cd782df3ad51d6ae1a95c24a45406c8250cf Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Fri, 9 Jun 2017 12:55:20 -0700 Subject: [PATCH 8/9] restoring default behavior in persistent replicator --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 160fe9d48b464..3da7cc1bee13b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -85,7 +85,7 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback private final Rate msgExpired = new Rate(); private static final ProducerConfiguration producerConfiguration = new ProducerConfiguration().setSendTimeout(0, - TimeUnit.SECONDS); + TimeUnit.SECONDS).setBlockIfQueueFull(true).setMaxPendingMessages(1000); private final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES); private int messageTTLInSeconds = 0; From a36d685ddfa597e0e05ab8a483103753c13a12b1 Mon Sep 17 00:00:00 2001 From: Jai ASher Date: Fri, 9 Jun 2017 14:25:11 -0700 Subject: [PATCH 9/9] Handled Matteo's PR comments --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 3da7cc1bee13b..b8d8e7c6f78b9 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -85,7 +85,7 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback private final Rate msgExpired = new Rate(); private static final ProducerConfiguration producerConfiguration = new ProducerConfiguration().setSendTimeout(0, - TimeUnit.SECONDS).setBlockIfQueueFull(true).setMaxPendingMessages(1000); + TimeUnit.SECONDS).setBlockIfQueueFull(true); private final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES); private int messageTTLInSeconds = 0;