Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -197,7 +197,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
}

message = messageTransform.apply(message);
OutstandingBatch batchToSend = null;
List<OutstandingBatch> batchesToSend = new ArrayList<>();
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
messagesBatchLock.lock();
try {
Expand All @@ -206,19 +206,18 @@ public ApiFuture<String> publish(PubsubMessage message) {
&& hasBatchingBytes()
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
>= getMaxBatchBytes()) {
batchToSend = messagesBatch.popOutstandingBatch();
batchesToSend.add(messagesBatch.popOutstandingBatch());
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) {
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);

// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = messagesBatch.popOutstandingBatch();
}
// Border case: If the message to send is greater or equals to the max batch size then send it
// immediately.
// Alternatively if after adding the message we have reached the batch max messages then we
// have a batch to send.
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
|| messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchesToSend.add(messagesBatch.popOutstandingBatch());
}
// Setup the next duration based delivery alarm if there are messages batched.
setupAlarm();
Expand All @@ -228,32 +227,17 @@ && hasBatchingBytes()

messagesWaiter.incrementPendingMessages(1);

if (batchToSend != null) {
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
final OutstandingBatch finalBatchToSend = batchToSend;
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(finalBatchToSend);
}
});
}

// If the message is over the size limit, it was not added to the pending messages and it will
// be sent in its own batch immediately.
if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) {
logger.log(
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(
new OutstandingBatch(
ImmutableList.of(outstandingPublish), outstandingPublish.messageSize));
}
});
if (!batchesToSend.isEmpty()) {
for (final OutstandingBatch batch : batchesToSend) {
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(batch);
}
});
}
}

return outstandingPublish.publishResult;
Expand Down