diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d7f668ab660b..f577e95806ca 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -40,13 +40,13 @@ import com.google.cloud.pubsub.v1.stub.PublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; @@ -197,7 +197,7 @@ public ApiFuture publish(PubsubMessage message) { } message = messageTransform.apply(message); - OutstandingBatch batchToSend = null; + List batchesToSend = new ArrayList<>(); final OutstandingPublish outstandingPublish = new OutstandingPublish(message); messagesBatchLock.lock(); try { @@ -206,19 +206,18 @@ public ApiFuture publish(PubsubMessage message) { && hasBatchingBytes() && messagesBatch.getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) { - batchToSend = messagesBatch.popOutstandingBatch(); + batchesToSend.add(messagesBatch.popOutstandingBatch()); } - // Border case if the message to send is greater or equals to the max batch size then can't - // be included in the current batch and instead sent immediately. - if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) { - messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize); + messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize); - // If after adding the message we have reached the batch max messages then we have a batch - // to send. - if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { - batchToSend = messagesBatch.popOutstandingBatch(); - } + // Border case: If the message to send is greater or equals to the max batch size then send it + // immediately. + // Alternatively if after adding the message we have reached the batch max messages then we + // have a batch to send. + if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) + || messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { + batchesToSend.add(messagesBatch.popOutstandingBatch()); } // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); @@ -228,32 +227,17 @@ && hasBatchingBytes() messagesWaiter.incrementPendingMessages(1); - if (batchToSend != null) { - logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - final OutstandingBatch finalBatchToSend = batchToSend; - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(finalBatchToSend); - } - }); - } - - // If the message is over the size limit, it was not added to the pending messages and it will - // be sent in its own batch immediately. - if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) { - logger.log( - Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send."); - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch( - new OutstandingBatch( - ImmutableList.of(outstandingPublish), outstandingPublish.messageSize)); - } - }); + if (!batchesToSend.isEmpty()) { + for (final OutstandingBatch batch : batchesToSend) { + logger.log(Level.FINER, "Scheduling a batch for immediate sending."); + executor.execute( + new Runnable() { + @Override + public void run() { + publishOutstandingBatch(batch); + } + }); + } } return outstandingPublish.publishResult;