Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,22 @@ public ApiFuture<String> publish(PubsubMessage message) {
}

message = messageTransform.apply(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
final OutstandingPublish outstandingPublish = new OutstandingPublish(publishResult, message);
final OutstandingPublish outstandingPublish = new OutstandingPublish(message);
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.
if (!messagesBatch.isEmpty()
&& hasBatchingBytes()
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
&& messagesBatch.getBatchedBytes() + outstandingPublish.messageSize
>= getMaxBatchBytes()) {
batchToSend = messagesBatch.popOutstandingBatch();
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
messagesBatch.addMessage(outstandingPublish, messageSize);
if (!hasBatchingBytes() || outstandingPublish.messageSize < getMaxBatchBytes()) {
messagesBatch.addMessage(outstandingPublish, outstandingPublish.messageSize);

// If after adding the message we have reached the batch max messages then we have a batch
// to send.
Expand Down Expand Up @@ -243,20 +242,21 @@ public void run() {

// If the message is over the size limit, it was not added to the pending messages and it will
// be sent in its own batch immediately.
if (hasBatchingBytes() && messageSize >= getMaxBatchBytes()) {
if (hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes()) {
logger.log(
Level.FINER, "Message exceeds the max batch bytes, scheduling it for immediate send.");
executor.execute(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(
new OutstandingBatch(ImmutableList.of(outstandingPublish), messageSize));
new OutstandingBatch(
ImmutableList.of(outstandingPublish), outstandingPublish.messageSize));
}
});
}

return publishResult;
return outstandingPublish.publishResult;
}

private void setupAlarm() {
Expand Down Expand Up @@ -382,12 +382,14 @@ public int size() {
}

private static final class OutstandingPublish {
SettableApiFuture<String> publishResult;
PubsubMessage message;
final SettableApiFuture<String> publishResult;
final PubsubMessage message;
final int messageSize;

OutstandingPublish(SettableApiFuture<String> publishResult, PubsubMessage message) {
this.publishResult = publishResult;
OutstandingPublish(PubsubMessage message) {
this.publishResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getSerializedSize();
}
}

Expand Down