From bdfae38f003de320722d8507bf71466f2d84f375 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 24 Apr 2019 13:56:34 -0400 Subject: [PATCH] Enhancing Publisher.OutstandingBatch - Moving responsibility for creating the SettableFuture into OutstandingBatch - Moving responsibility for tracking message size into OutstandingBatch --- .../com/google/cloud/pubsub/v1/Publisher.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 100eba65579a..febbce0ecbf0 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -197,23 +197,22 @@ public ApiFuture publish(PubsubMessage message) { } message = messageTransform.apply(message); - final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; - SettableApiFuture publishResult = SettableApiFuture.create(); - final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message); + final OutstandingPublish outstandingPublish = new OutstandingPublish(message); messagesBatchLock.lock(); try { // Check if the next message makes the current batch exceed the max batch byte size. if (!messagesBatch.isEmpty() && hasBatchingBytes() - && messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { + && messagesBatch.getBatchedBytes() + outstandingPublish.messageSize + >= getMaxBatchBytes()) { batchToSend = messagesBatch.popOutstandingBatch(); } // Border case if the message to send is greater or equals to the max batch size then can't // be included in the current batch and instead sent immediately. - if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) { - messagesBatch.addMessage(outstandingPublish, messageSize); + if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) { + messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize); // If after adding the message we have reached the batch max messages then we have a batch // to send. @@ -250,7 +249,7 @@ public void run() { // If the message is over the size limit, it was not added to the pending messages and it will // be sent in its own batch immediately. - if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) { + if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) { logger.log( Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send."); executor.execute( @@ -258,12 +257,13 @@ public void run() { @Override public void run() { publishOutstandingBatch( - new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize)); + new OutstandingBatch( + ImmutableList.of(outstandingPublish), outstandingPublish.messageSize)); } }); } - return publishResult; + return outstandingPublish.publishResult; } private void setupDurationBasedPublishAlarm() { @@ -382,12 +382,14 @@ public int size() { } private static final class OutstandingPublish { - SettableApiFuture publishResult; - PubsubMessage message; + final SettableApiFuture publishResult; + final PubsubMessage message; + final int messageSize; - OutstandingPublish(SettableApiFuture publishResult, PubsubMessage message) { - this.publishResult = publishResult; + OutstandingPublish(PubsubMessage message) { + this.publishResult = SettableApiFuture.create(); this.message = message; + this.messageSize = message.getSerializedSize(); } }