diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index ea2475cb3fdf..5666d23abf5f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -84,8 +84,7 @@ public class Publisher { private final BatchingSettings batchingSettings; private final Lock messagesBatchLock; - private List messagesBatch; - private int batchedBytes; + private MessagesBatch messagesBatch; private final AtomicBoolean activeAlarm; @@ -114,7 +113,7 @@ private Publisher(Builder builder) throws IOException { this.batchingSettings = builder.batchingSettings; this.messageTransform = builder.messageTransform; - messagesBatch = new LinkedList<>(); + messagesBatch = new MessagesBatch(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); @@ -205,24 +204,19 @@ public ApiFuture publish(PubsubMessage message) { // Check if the next message makes the current batch exceed the max batch byte size. if (!messagesBatch.isEmpty() && hasBatchingBytes() - && batchedBytes + messageSize >= getMaxBatchBytes()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + && messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { + batchToSend = messagesBatch.popOutstandingBatch(); } // Border case if the message to send is greater or equals to the max batch size then can't // be included in the current batch and instead sent immediately. if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) { - batchedBytes += messageSize; - messagesBatch.add(outstandingPublish); + messagesBatch.addMessage(outstandingPublish, messageSize); // If after adding the message we have reached the batch max messages then we have a batch // to send. - if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) { - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { + batchToSend = messagesBatch.popOutstandingBatch(); } } // Setup the next duration based delivery alarm if there are messages batched. @@ -301,9 +295,7 @@ public void publishAllOutstanding() { if (messagesBatch.isEmpty()) { return; } - batchToSend = new OutstandingBatch(messagesBatch, batchedBytes); - messagesBatch = new LinkedList<>(); - batchedBytes = 0; + batchToSend = messagesBatch.popOutstandingBatch(); } finally { messagesBatchLock.unlock(); } @@ -637,4 +629,37 @@ public Publisher build() throws IOException { return new Publisher(this); } } + + private static class MessagesBatch { + private List messages = new LinkedList<>(); + private int batchedBytes; + + private OutstandingBatch popOutstandingBatch() { + OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); + reset(); + return batch; + } + + private void reset() { + messages = new LinkedList<>(); + batchedBytes = 0; + } + + private boolean isEmpty() { + return messages.isEmpty(); + } + + private int getBatchedBytes() { + return batchedBytes; + } + + private void addMessage(OutstandingPublish message, int messageSize) { + messages.add(message); + batchedBytes += messageSize; + } + + private int getMessagesCount() { + return messages.size(); + } + } }