From dfbaf388bc6b9c29ec3f8e5b96c14ce2d3fdbf99 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Sun, 28 Apr 2019 16:03:28 -0400 Subject: [PATCH 1/3] Pub/Sub: add MessagesBatch.add method Adding a new `MessagesBatch` method: `private List add(OutstandingPublish outstandingPublish)`. This method is responsible for adding a new user request in the form of a `OutstandingPublish`, and returning 0-2 `OutstandingBatch`, depending on the state of the current messages in flight, and the size of the new `OutstandingPublish`. --- .../com/google/cloud/pubsub/v1/Publisher.java | 71 +++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 352822a71b35..4ce02c2ab15c 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -115,7 +115,7 @@ private Publisher(Builder builder) throws IOException { this.batchingSettings = builder.batchingSettings; this.messageTransform = builder.messageTransform; - messagesBatch = new MessagesBatch(); + messagesBatch = new MessagesBatch(batchingSettings); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); @@ -196,29 +196,11 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } - message = messageTransform.apply(message); - List batchesToSend = new ArrayList<>(); - final OutstandingPublish outstandingPublish = new OutstandingPublish(message); + final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); + List batchesToSend; messagesBatchLock.lock(); try { - // Check if the next message makes the current batch exceed the max batch byte size. - if (!messagesBatch.isEmpty() - && hasBatchingBytes() - && messagesBatch.getBatchedBytes() + outstandingPublish.messageSize - >= getMaxBatchBytes()) { - batchesToSend.add(messagesBatch.popOutstandingBatch()); - } - - messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize); - - // Border case: If the message to send is greater or equals to the max batch size then send it - // immediately. - // Alternatively if after adding the message we have reached the batch max messages then we - // have a batch to send. - if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) - || messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { - batchesToSend.add(messagesBatch.popOutstandingBatch()); - } + batchesToSend = messagesBatch.add(outstandingPublish); // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); } finally { @@ -620,8 +602,14 @@ public Publisher build() throws IOException { } private static class MessagesBatch { - private List messages = new LinkedList<>(); + private List messages; private int batchedBytes; + private final BatchingSettings batchingSettings; + + public MessagesBatch(BatchingSettings batchingSettings) { + this.batchingSettings = batchingSettings; + reset(); + } private OutstandingBatch popOutstandingBatch() { OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); @@ -642,13 +630,40 @@ private int getBatchedBytes() { return batchedBytes; } - private void addMessage(OutstandingPublish message, int messageSize) { - messages.add(message); - batchedBytes += messageSize; - } - private int getMessagesCount() { return messages.size(); } + + private boolean hasBatchingBytes() { + return getMaxBatchBytes() > 0; + } + + private long getMaxBatchBytes() { + return batchingSettings.getRequestByteThreshold(); + } + + private List add(OutstandingPublish outstandingPublish) { + List batchesToSend = new ArrayList<>(); + // Check if the next message makes the current batch exceed the max batch byte size. + if (!isEmpty() + && hasBatchingBytes() + && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) { + batchesToSend.add(popOutstandingBatch()); + } + + messages.add(outstandingPublish); + batchedBytes += outstandingPublish.messageSize; + + // Border case: If the message to send is greater or equals to the max batch size then send it + // immediately. + // Alternatively if after adding the message we have reached the batch max messages then we + // have a batch to send. + if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) + || getMessagesCount() == batchingSettings.getElementCountThreshold()) { + batchesToSend.add(popOutstandingBatch()); + } + + return batchesToSend; + } } } From 38a75f96c07d68e1e3ff44f18c562658723a1356 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Mon, 29 Apr 2019 08:50:14 -0400 Subject: [PATCH 2/3] Fixing formatting. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 4ce02c2ab15c..a1d69b9090aa 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -196,7 +196,8 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } - final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); + final OutstandingPublish outstandingPublish = + new OutstandingPublish(messageTransform.apply(message)); List batchesToSend; messagesBatchLock.lock(); try { From bb2af62672a0435529e50b4bb133ff19d59935ad Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Mon, 29 Apr 2019 08:53:49 -0400 Subject: [PATCH 3/3] Removing methods that are no longer used. --- .../main/java/com/google/cloud/pubsub/v1/Publisher.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index a1d69b9090aa..5ff2b7ac5e13 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -361,10 +361,6 @@ public BatchingSettings getBatchingSettings() { return batchingSettings; } - private long getMaxBatchBytes() { - return getBatchingSettings().getRequestByteThreshold(); - } - /** * Schedules immediate publishing of any outstanding messages and waits until all are processed. * @@ -397,10 +393,6 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted return publisherStub.awaitTermination(duration, unit); } - private boolean hasBatchingBytes() { - return getMaxBatchBytes() > 0; - } - /** * Constructs a new {@link Builder} using the given topic. *