Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.FlowController;
import com.google.api.core.ApiClock;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.FlowController.FlowControlException;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.stats.Distribution;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
Expand All @@ -29,8 +32,9 @@
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
Expand All @@ -42,6 +46,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Interval;
Expand All @@ -57,7 +62,11 @@ class MessageDispatcher {
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m

private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor();

private final ScheduledExecutorService executor;
private final ScheduledExecutorService alarmsExecutor;
private final ApiClock clock;

private final Duration ackExpirationPadding;
Expand All @@ -78,6 +87,8 @@ class MessageDispatcher {
private Instant nextAckDeadlineExtensionAlarmTime;
private ScheduledFuture<?> pendingAcksAlarm;

private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;

// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

Expand Down Expand Up @@ -190,6 +201,7 @@ public void onFailure(Throwable t) {
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
Expand All @@ -200,25 +212,23 @@ public void onSuccess(AckReply reply) {
synchronized (pendingAcks) {
pendingAcks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D)));
messagesWaiter.incrementPendingMessages(-1);
return;
break;
case NACK:
synchronized (pendingNacks) {
pendingNacks.add(ackId);
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
return;
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
setupPendingAcksAlarm();
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}
}

Expand All @@ -235,13 +245,16 @@ void sendAckOperations(
Distribution ackLatencyDistribution,
FlowController flowController,
ScheduledExecutorService executor,
@Nullable ScheduledExecutorService alarmsExecutor,
ApiClock clock) {
this.executor = executor;
this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor;
this.ackExpirationPadding = ackExpirationPadding;
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
outstandingMessageBatches = new LinkedList<>();
outstandingAckHandlers = new PriorityQueue<>();
pendingAcks = new HashSet<>();
pendingNacks = new HashSet<>();
Expand Down Expand Up @@ -275,28 +288,109 @@ public int getMessageDeadlineSeconds() {
return messageDeadlineSeconds;
}

public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
int receivedMessagesCount = responseMessages.size();
if (receivedMessagesCount == 0) {
static class OutstandingMessagesBatch {
private final Deque<OutstandingMessage> messages;
private final Runnable doneCallback;

static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}

public ReceivedMessage receivedMessage() {
return receivedMessage;
}

public AckHandler ackHandler() {
return ackHandler;
}
}

public OutstandingMessagesBatch(Runnable doneCallback) {
this.messages = new LinkedList<>();
this.doneCallback = doneCallback;
}

public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
}

public Deque<OutstandingMessage> messages() {
return messages;
}
}

public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {

This comment was marked as spam.

This comment was marked as spam.

if (messages.isEmpty()) {
doneCallback.run();
return;
}
Instant now = new Instant(clock.millisTime());
int totalByteCount = 0;
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
for (ReceivedMessage pubsubMessage : responseMessages) {
int messageSize = pubsubMessage.getMessage().getSerializedSize();
totalByteCount += messageSize;
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
messagesWaiter.incrementPendingMessages(messages.size());

OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
ackHandlers.add(ackHandler);
outstandingBatch.addMessage(message, ackHandler);
}

Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000);
synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(
new Instant(clock.millisTime()),
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
}
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
logger.log(
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});

messagesWaiter.incrementPendingMessages(responseMessages.size());
Iterator<AckHandler> acksIterator = ackHandlers.iterator();
for (ReceivedMessage userMessage : responseMessages) {
final PubsubMessage message = userMessage.getMessage();
final AckHandler ackHandler = acksIterator.next();
processOutstandingBatches();
}

public void processOutstandingBatches() {
while (true) {
boolean batchDone = false;
Runnable batchCallback = null;
OutstandingMessage outstandingMessage;
synchronized (outstandingMessageBatches) {
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
if (nextBatch == null) {
return;
}
outstandingMessage = nextBatch.messages.peek();
if (outstandingMessage == null) {
return;
}
try {
// This is a non-blocking flow controller.
flowController.reserve(
1, outstandingMessage.receivedMessage().getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
nextBatch.messages.poll(); // We got a hold to the message already.
batchDone = nextBatch.messages.isEmpty();
if (batchDone) {
outstandingMessageBatches.poll();
batchCallback = nextBatch.doneCallback;
}
}

final PubsubMessage message = outstandingMessage.receivedMessage().getMessage();
final AckHandler ackHandler = outstandingMessage.ackHandler();
final SettableFuture<AckReply> response = SettableFuture.create();
final AckReplyConsumer consumer =
new AckReplyConsumer() {
Expand All @@ -322,22 +416,9 @@ public void run() {
}
}
});
}

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(
new Instant(clock.millisTime()),
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
if (batchDone) {
batchCallback.run();
}
}
}

Expand All @@ -346,7 +427,7 @@ private void setupPendingAcksAlarm() {
try {
if (pendingAcksAlarm == null) {
pendingAcksAlarm =
executor.schedule(
alarmsExecutor.schedule(
new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -411,7 +492,7 @@ public void run() {
// drop it.
continue;
}

// If a message has already been acked, remove it, nothing to do.
for (int i = 0; i < job.ackHandlers.size(); ) {
if (job.ackHandlers.get(i).acked.get()) {
Expand Down Expand Up @@ -475,7 +556,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) {
nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime;

ackDeadlineExtensionAlarm =
executor.schedule(
alarmsExecutor.schedule(
new AckDeadlineAlarm(),
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(),
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.joda.time.Duration;

/**
Expand All @@ -59,6 +60,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac
private final ScheduledExecutorService executor;
private final SubscriberFutureStub stub;
private final MessageDispatcher messageDispatcher;
private final int maxDesiredPulledMessages;

public PollingSubscriberConnection(
String subscription,
Expand All @@ -68,7 +70,9 @@ public PollingSubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
@Nullable Integer maxDesiredPulledMessages,
ScheduledExecutorService executor,
@Nullable ScheduledExecutorService alarmsExecutor,
ApiClock clock) {
this.subscription = subscription;
this.executor = executor;
Expand All @@ -82,8 +86,11 @@ public PollingSubscriberConnection(
ackLatencyDistribution,
flowController,
executor,
alarmsExecutor,
clock);
messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS);
this.maxDesiredPulledMessages =
maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES;
}

@Override
Expand Down Expand Up @@ -112,7 +119,8 @@ public void onSuccess(Subscription result) {
public void onFailure(Throwable cause) {
notifyFailed(cause);
}
});
},
executor);
}

@Override
Expand All @@ -127,7 +135,7 @@ private void pullMessages(final Duration backoff) {
.pull(
PullRequest.newBuilder()
.setSubscription(subscription)
.setMaxMessages(DEFAULT_MAX_MESSAGES)
.setMaxMessages(maxDesiredPulledMessages)
.setReturnImmediately(true)
.build());

Expand All @@ -136,7 +144,6 @@ private void pullMessages(final Duration backoff) {
new FutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse pullResponse) {
messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList());
if (pullResponse.getReceivedMessagesCount() == 0) {
// No messages in response, possibly caught up in backlog, we backoff to avoid
// slamming the server.
Expand All @@ -155,7 +162,14 @@ public void run() {
TimeUnit.MILLISECONDS);
return;
}
pullMessages(INITIAL_BACKOFF);
messageDispatcher.processReceivedMessages(
pullResponse.getReceivedMessagesList(),
new Runnable() {
@Override
public void run() {
pullMessages(INITIAL_BACKOFF);
}
});
}

@Override
Expand Down Expand Up @@ -185,7 +199,8 @@ public void run() {
notifyFailed(cause);
}
}
});
},
executor);
}

private boolean isAlive() {
Expand Down
Loading