diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 352822a71b35..5ff2b7ac5e13 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -115,7 +115,7 @@ private Publisher(Builder builder) throws IOException { this.batchingSettings = builder.batchingSettings; this.messageTransform = builder.messageTransform; - messagesBatch = new MessagesBatch(); + messagesBatch = new MessagesBatch(batchingSettings); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); @@ -196,29 +196,12 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } - message = messageTransform.apply(message); - List batchesToSend = new ArrayList<>(); - final OutstandingPublish outstandingPublish = new OutstandingPublish(message); + final OutstandingPublish outstandingPublish = + new OutstandingPublish(messageTransform.apply(message)); + List batchesToSend; messagesBatchLock.lock(); try { - // Check if the next message makes the current batch exceed the max batch byte size. - if (!messagesBatch.isEmpty() - && hasBatchingBytes() - && messagesBatch.getBatchedBytes() + outstandingPublish.messageSize - >= getMaxBatchBytes()) { - batchesToSend.add(messagesBatch.popOutstandingBatch()); - } - - messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize); - - // Border case: If the message to send is greater or equals to the max batch size then send it - // immediately. - // Alternatively if after adding the message we have reached the batch max messages then we - // have a batch to send. - if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) - || messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { - batchesToSend.add(messagesBatch.popOutstandingBatch()); - } + batchesToSend = messagesBatch.add(outstandingPublish); // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); } finally { @@ -378,10 +361,6 @@ public BatchingSettings getBatchingSettings() { return batchingSettings; } - private long getMaxBatchBytes() { - return getBatchingSettings().getRequestByteThreshold(); - } - /** * Schedules immediate publishing of any outstanding messages and waits until all are processed. * @@ -414,10 +393,6 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted return publisherStub.awaitTermination(duration, unit); } - private boolean hasBatchingBytes() { - return getMaxBatchBytes() > 0; - } - /** * Constructs a new {@link Builder} using the given topic. * @@ -620,8 +595,14 @@ public Publisher build() throws IOException { } private static class MessagesBatch { - private List messages = new LinkedList<>(); + private List messages; private int batchedBytes; + private final BatchingSettings batchingSettings; + + public MessagesBatch(BatchingSettings batchingSettings) { + this.batchingSettings = batchingSettings; + reset(); + } private OutstandingBatch popOutstandingBatch() { OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); @@ -642,13 +623,40 @@ private int getBatchedBytes() { return batchedBytes; } - private void addMessage(OutstandingPublish message, int messageSize) { - messages.add(message); - batchedBytes += messageSize; - } - private int getMessagesCount() { return messages.size(); } + + private boolean hasBatchingBytes() { + return getMaxBatchBytes() > 0; + } + + private long getMaxBatchBytes() { + return batchingSettings.getRequestByteThreshold(); + } + + private List add(OutstandingPublish outstandingPublish) { + List batchesToSend = new ArrayList<>(); + // Check if the next message makes the current batch exceed the max batch byte size. + if (!isEmpty() + && hasBatchingBytes() + && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) { + batchesToSend.add(popOutstandingBatch()); + } + + messages.add(outstandingPublish); + batchedBytes += outstandingPublish.messageSize; + + // Border case: If the message to send is greater or equals to the max batch size then send it + // immediately. + // Alternatively if after adding the message we have reached the batch max messages then we + // have a batch to send. + if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) + || getMessagesCount() == batchingSettings.getElementCountThreshold()) { + batchesToSend.add(popOutstandingBatch()); + } + + return batchesToSend; + } } }