Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback
private final Rate msgExpired = new Rate();

private static final ProducerConfiguration producerConfiguration = new ProducerConfiguration().setSendTimeout(0,
TimeUnit.SECONDS);
TimeUnit.SECONDS).setBlockIfQueueFull(true);

private final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES);
private int messageTTLInSeconds = 0;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ProducerConfigurationImpl {
compressionType(CompressionNone),
maxPendingMessages(30000),
routingMode(ProducerConfiguration::UseSinglePartition),
blockIfQueueFull(true),
blockIfQueueFull(false),
batchingEnabled(false),
batchingMaxMessages(1000),
batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/perf/PerfProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ int main(int argc, char** argv) {
producerConf.setBatchingMaxAllowedSizeInBytes(args.batchingMaxAllowedSizeInBytes);
producerConf.setBatchingMaxPublishDelayMs(args.batchingMaxPublishDelayMs);
}

// Block if queue is full else we will start seeing errors in sendAsync
producerConf.setBlockIfQueueFull(true);
pulsar::ClientConfiguration conf;
conf.setUseTls(args.isUseTls);
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public interface Producer extends Closeable {
/**
* Send a message asynchronously
* <p>
* When the producer queue is full, by default this method will <b>block</b> until there will be space available in
* the queue.
* When the producer queue is full, by default this method will complete the future with an exception {@link PulsarClientException#ProducerQueueIsFullError}
* <p>
* See {@link ProducerConfiguration#setMaxPendingMessages} to configure the producer queue size and
* {@link ProducerConfiguration#setBlockIfQueueFull(boolean)} to change the blocking behavior.
Expand Down Expand Up @@ -90,8 +89,7 @@ public interface Producer extends Closeable {
* });</code>
* </pre>
* <p>
* When the producer queue is full, by default this method will <b>block</b> until there will be space available in
* the queue.
* When the producer queue is full, by default this method will complete the future with an exception {@link PulsarClientException#ProducerQueueIsFullError}
* <p>
* See {@link ProducerConfiguration#setMaxPendingMessages} to configure the producer queue size and
* {@link ProducerConfiguration#setBlockIfQueueFull(boolean)} to change the blocking behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class ProducerConfiguration implements Serializable {
*/
private static final long serialVersionUID = 1L;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = 30000;
private boolean blockIfQueueFull = true;
private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMs = 10;
Expand Down Expand Up @@ -83,8 +83,8 @@ public int getMaxPendingMessages() {
/**
* Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
* <p>
* When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} and send
* will block the calling thread. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
* When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync}
* will fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior.
*
* @param maxPendingMessages
* @return
Expand All @@ -108,7 +108,7 @@ public boolean getBlockIfQueueFull() {
* Set whether the {@link Producer#send} and {@link Producer#sendAsync} operations should block when the outgoing
* message queue is full.
* <p>
* Default is <code>true</code>. If set to <code>false</code>, send operations will immediately fail with
* Default is <code>false</code>. If set to <code>false</code>, send operations will immediately fail with
* {@link ProducerQueueIsFullError} when there is no space left in pending queue.
*
* @param blockIfQueueFull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ public static void main(String[] args) throws Exception {
producerConf.setMaxPendingMessages(arguments.msgRate);
}

// Block if queue is full else we will start seeing errors in sendAsync
producerConf.setBlockIfQueueFull(true);

for (int i = 0; i < arguments.numTopics; i++) {
String topic = (arguments.numTopics == 1) ? prefixTopicName : String.format("%s-%d", prefixTopicName, i);
log.info("Adding {} publishers on destination {}", arguments.numProducers, topic);
Expand Down