Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private Publisher(Builder builder) throws IOException {
this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new MessagesBatch();
messagesBatch = new MessagesBatch(batchingSettings);
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
Expand Down Expand Up @@ -196,29 +196,12 @@ public ApiFuture<String> publish(PubsubMessage message) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}

message = messageTransform.apply(message);
List<OutstandingBatch> batchesToSend = new ArrayList<>();
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));
List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.
if (!messagesBatch.isEmpty()
&& hasBatchingBytes()
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
>= getMaxBatchBytes()) {
batchesToSend.add(messagesBatch.popOutstandingBatch());
}

messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);

// Border case: If the message to send is greater or equals to the max batch size then send it
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchesToSend.add(messagesBatch.popOutstandingBatch());
}
batchesToSend = messagesBatch.add(outstandingPublish);
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
} finally {
Expand Down Expand Up @@ -378,10 +361,6 @@ public BatchingSettings getBatchingSettings() {
return batchingSettings;
}

private long getMaxBatchBytes() {
return getBatchingSettings().getRequestByteThreshold();
}

/**
* Schedules immediate publishing of any outstanding messages and waits until all are processed.
*
Expand Down Expand Up @@ -414,10 +393,6 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
return publisherStub.awaitTermination(duration, unit);
}

private boolean hasBatchingBytes() {
return getMaxBatchBytes() > 0;
}

/**
* Constructs a new {@link Builder} using the given topic.
*
Expand Down Expand Up @@ -620,8 +595,14 @@ public Publisher build() throws IOException {
}

private static class MessagesBatch {
private List<OutstandingPublish> messages = new LinkedList<>();
private List<OutstandingPublish> messages;
private int batchedBytes;
private final BatchingSettings batchingSettings;

public MessagesBatch(BatchingSettings batchingSettings) {
this.batchingSettings = batchingSettings;
reset();
}

private OutstandingBatch popOutstandingBatch() {
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
Expand All @@ -642,13 +623,40 @@ private int getBatchedBytes() {
return batchedBytes;
}

private void addMessage(OutstandingPublish message, int messageSize) {
messages.add(message);
batchedBytes += messageSize;
}

private int getMessagesCount() {
return messages.size();
}

private boolean hasBatchingBytes() {
return getMaxBatchBytes() > 0;
}

private long getMaxBatchBytes() {
return batchingSettings.getRequestByteThreshold();
}

private List<OutstandingBatch> add(OutstandingPublish outstandingPublish) {
List<OutstandingBatch> batchesToSend = new ArrayList<>();
// Check if the next message makes the current batch exceed the max batch byte size.
if (!isEmpty()
&& hasBatchingBytes()
&& getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) {
batchesToSend.add(popOutstandingBatch());
}

messages.add(outstandingPublish);
batchedBytes += outstandingPublish.messageSize;

// Border case: If the message to send is greater or equals to the max batch size then send it
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
batchesToSend.add(popOutstandingBatch());
}

return batchesToSend;
}
}
}