Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public class Publisher {
private final BatchingSettings batchingSettings;

private final Lock messagesBatchLock;
private List<OutstandingPublish> messagesBatch;
private int batchedBytes;
private MessagesBatch messagesBatch;

private final AtomicBoolean activeAlarm;

Expand Down Expand Up @@ -114,7 +113,7 @@ private Publisher(Builder builder) throws IOException {
this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new LinkedList<>();
messagesBatch = new MessagesBatch();
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
Expand Down Expand Up @@ -205,24 +204,19 @@ public ApiFuture<String> publish(PubsubMessage message) {
// Check if the next message makes the current batch exceed the max batch byte size.
if (!messagesBatch.isEmpty()
&& hasBatchingBytes()
&& batchedBytes + messageSize >= getMaxBatchBytes()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = messagesBatch.popOutstandingBatch();
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
batchedBytes += messageSize;
messagesBatch.add(outstandingPublish);
messagesBatch.addMessage(outstandingPublish, messageSize);

// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = messagesBatch.popOutstandingBatch();
}
}
// Setup the next duration based delivery alarm if there are messages batched.
Expand Down Expand Up @@ -301,9 +295,7 @@ public void publishAllOutstanding() {
if (messagesBatch.isEmpty()) {
return;
}
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
batchToSend = messagesBatch.popOutstandingBatch();
} finally {
messagesBatchLock.unlock();
}
Expand Down Expand Up @@ -637,4 +629,37 @@ public Publisher build() throws IOException {
return new Publisher(this);
}
}

private static class MessagesBatch {
private List<OutstandingPublish> messages = new LinkedList<>();
private int batchedBytes;

private OutstandingBatch popOutstandingBatch() {
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
reset();
return batch;
}

private void reset() {
messages = new LinkedList<>();
batchedBytes = 0;
}

private boolean isEmpty() {
return messages.isEmpty();
}

private int getBatchedBytes() {
return batchedBytes;
}

private void addMessage(OutstandingPublish message, int messageSize) {
messages.add(message);
batchedBytes += messageSize;
}

private int getMessagesCount() {
return messages.size();
}
}
}