diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index a780c071c703..5e8eb411b0ba 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,9 +16,12 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; import com.google.api.core.ApiClock; +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.FlowControlException; +import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; +import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; @@ -29,8 +32,9 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.Set; @@ -42,6 +46,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.Interval; @@ -57,7 +62,11 @@ class MessageDispatcher { @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m + private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR = + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor(); + private final ScheduledExecutorService executor; + private final ScheduledExecutorService alarmsExecutor; private final ApiClock clock; private final Duration ackExpirationPadding; @@ -78,6 +87,8 @@ class MessageDispatcher { private Instant nextAckDeadlineExtensionAlarmTime; private ScheduledFuture pendingAcksAlarm; + private final Deque outstandingMessageBatches; + // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -190,6 +201,7 @@ public void onFailure(Throwable t) { setupPendingAcksAlarm(); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } @Override @@ -200,25 +212,23 @@ public void onSuccess(AckReply reply) { synchronized (pendingAcks) { pendingAcks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); // Record the latency rounded to the next closest integer. ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D))); - messagesWaiter.incrementPendingMessages(-1); - return; + break; case NACK: synchronized (pendingNacks) { pendingNacks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); - return; + break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } + setupPendingAcksAlarm(); + flowController.release(1, outstandingBytes); + messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } } @@ -235,13 +245,16 @@ void sendAckOperations( Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.executor = executor; + this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor; this.ackExpirationPadding = ackExpirationPadding; this.maxAckExtensionPeriod = maxAckExtensionPeriod; this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; + outstandingMessageBatches = new LinkedList<>(); outstandingAckHandlers = new PriorityQueue<>(); pendingAcks = new HashSet<>(); pendingNacks = new HashSet<>(); @@ -275,28 +288,109 @@ public int getMessageDeadlineSeconds() { return messageDeadlineSeconds; } - public void processReceivedMessages(List responseMessages) { - int receivedMessagesCount = responseMessages.size(); - if (receivedMessagesCount == 0) { + static class OutstandingMessagesBatch { + private final Deque messages; + private final Runnable doneCallback; + + static class OutstandingMessage { + private final ReceivedMessage receivedMessage; + private final AckHandler ackHandler; + + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; + } + + public ReceivedMessage receivedMessage() { + return receivedMessage; + } + + public AckHandler ackHandler() { + return ackHandler; + } + } + + public OutstandingMessagesBatch(Runnable doneCallback) { + this.messages = new LinkedList<>(); + this.doneCallback = doneCallback; + } + + public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); + } + + public Deque messages() { + return messages; + } + } + + public void processReceivedMessages(List messages, Runnable doneCallback) { + if (messages.isEmpty()) { + doneCallback.run(); return; } - Instant now = new Instant(clock.millisTime()); - int totalByteCount = 0; - final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); - for (ReceivedMessage pubsubMessage : responseMessages) { - int messageSize = pubsubMessage.getMessage().getSerializedSize(); - totalByteCount += messageSize; - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); + messagesWaiter.incrementPendingMessages(messages.size()); + + OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback); + final ArrayList ackHandlers = new ArrayList<>(messages.size()); + for (ReceivedMessage message : messages) { + AckHandler ackHandler = + new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); + ackHandlers.add(ackHandler); + outstandingBatch.addMessage(message, ackHandler); + } + + Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000); + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); + } + setupNextAckDeadlineExtensionAlarm(expiration); + + synchronized (outstandingMessageBatches) { + outstandingMessageBatches.add(outstandingBatch); } - Instant expiration = now.plus(messageDeadlineSeconds * 1000); - logger.log( - Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); - - messagesWaiter.incrementPendingMessages(responseMessages.size()); - Iterator acksIterator = ackHandlers.iterator(); - for (ReceivedMessage userMessage : responseMessages) { - final PubsubMessage message = userMessage.getMessage(); - final AckHandler ackHandler = acksIterator.next(); + processOutstandingBatches(); + } + + public void processOutstandingBatches() { + while (true) { + boolean batchDone = false; + Runnable batchCallback = null; + OutstandingMessage outstandingMessage; + synchronized (outstandingMessageBatches) { + OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek(); + if (nextBatch == null) { + return; + } + outstandingMessage = nextBatch.messages.peek(); + if (outstandingMessage == null) { + return; + } + try { + // This is a non-blocking flow controller. + flowController.reserve( + 1, outstandingMessage.receivedMessage().getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + } + nextBatch.messages.poll(); // We got a hold to the message already. + batchDone = nextBatch.messages.isEmpty(); + if (batchDone) { + outstandingMessageBatches.poll(); + batchCallback = nextBatch.doneCallback; + } + } + + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); final SettableFuture response = SettableFuture.create(); final AckReplyConsumer consumer = new AckReplyConsumer() { @@ -322,22 +416,9 @@ public void run() { } } }); - } - - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob( - new Instant(clock.millisTime()), - expiration, - INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, - ackHandlers)); - } - setupNextAckDeadlineExtensionAlarm(expiration); - - try { - flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + if (batchDone) { + batchCallback.run(); + } } } @@ -346,7 +427,7 @@ private void setupPendingAcksAlarm() { try { if (pendingAcksAlarm == null) { pendingAcksAlarm = - executor.schedule( + alarmsExecutor.schedule( new Runnable() { @Override public void run() { @@ -411,7 +492,7 @@ public void run() { // drop it. continue; } - + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { @@ -475,7 +556,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime; ackDeadlineExtensionAlarm = - executor.schedule( + alarmsExecutor.schedule( new AckDeadlineAlarm(), nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(), TimeUnit.MILLISECONDS); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 217af07e85ac..65bf82f2603f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -59,6 +60,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac private final ScheduledExecutorService executor; private final SubscriberFutureStub stub; private final MessageDispatcher messageDispatcher; + private final int maxDesiredPulledMessages; public PollingSubscriberConnection( String subscription, @@ -68,7 +70,9 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + @Nullable Integer maxDesiredPulledMessages, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.subscription = subscription; this.executor = executor; @@ -82,8 +86,11 @@ public PollingSubscriberConnection( ackLatencyDistribution, flowController, executor, + alarmsExecutor, clock); messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + this.maxDesiredPulledMessages = + maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES; } @Override @@ -112,7 +119,8 @@ public void onSuccess(Subscription result) { public void onFailure(Throwable cause) { notifyFailed(cause); } - }); + }, + executor); } @Override @@ -127,7 +135,7 @@ private void pullMessages(final Duration backoff) { .pull( PullRequest.newBuilder() .setSubscription(subscription) - .setMaxMessages(DEFAULT_MAX_MESSAGES) + .setMaxMessages(maxDesiredPulledMessages) .setReturnImmediately(true) .build()); @@ -136,7 +144,6 @@ private void pullMessages(final Duration backoff) { new FutureCallback() { @Override public void onSuccess(PullResponse pullResponse) { - messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList()); if (pullResponse.getReceivedMessagesCount() == 0) { // No messages in response, possibly caught up in backlog, we backoff to avoid // slamming the server. @@ -155,7 +162,14 @@ public void run() { TimeUnit.MILLISECONDS); return; } - pullMessages(INITIAL_BACKOFF); + messageDispatcher.processReceivedMessages( + pullResponse.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + pullMessages(INITIAL_BACKOFF); + } + }); } @Override @@ -185,7 +199,8 @@ public void run() { notifyFailed(cause); } } - }); + }, + executor); } private boolean isAlive() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 69f75724cc31..b8cd7e042432 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -72,6 +72,7 @@ public StreamingSubscriberConnection( Channel channel, FlowController flowController, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.subscription = subscription; this.executor = executor; @@ -85,6 +86,7 @@ public StreamingSubscriberConnection( ackLatencyDistribution, flowController, executor, + alarmsExecutor, clock); messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds); } @@ -120,11 +122,17 @@ public void beforeStart(ClientCallStreamObserver requestOb @Override public void onNext(StreamingPullResponse response) { - messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); - // Only if not shutdown we will request one more batches of messages to be delivered. - if (isAlive()) { - requestObserver.request(1); - } + messageDispatcher.processReceivedMessages( + response.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + // Only if not shutdown we will request one more batches of messages to be delivered. + if (isAlive()) { + requestObserver.request(1); + } + } + }); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index bd7e0d1387de..9d14c1bbeee3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -23,6 +23,7 @@ import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController; import com.google.api.gax.grpc.ChannelProvider; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; @@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -92,6 +94,7 @@ public class Subscriber extends AbstractApiService { private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; + @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); private final int numChannels; @@ -106,7 +109,7 @@ public class Subscriber extends AbstractApiService { private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; - private Subscriber(Builder builder) throws IOException { + private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; @@ -119,7 +122,13 @@ private Subscriber(Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); - flowController = new FlowController(builder.flowControlSettings); + flowController = + new FlowController( + builder + .flowControlSettings + .toBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { @@ -131,6 +140,20 @@ public void close() throws IOException { } }); } + if (builder.alarmsExecutorProvider != null) { + alarmsExecutor = builder.alarmsExecutorProvider.getExecutor(); + if (builder.alarmsExecutorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() throws IOException { + alarmsExecutor.shutdown(); + } + }); + } + } else { + alarmsExecutor = null; + } channelProvider = builder.channelProvider; @@ -254,6 +277,7 @@ private void startStreamingConnections() { channels.get(i), flowController, executor, + alarmsExecutor, clock)); } startConnections( @@ -328,7 +352,9 @@ private void startPollingConnections() { ackLatencyDistribution, channels.get(i), flowController, + flowControlSettings.getMaxOutstandingElementCount(), executor, + alarmsExecutor, clock)); } startConnections( @@ -433,6 +459,7 @@ public static final class Builder { FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; + @Nullable ExecutorProvider alarmsExecutorProvider; ChannelProvider channelProvider = SubscriptionAdminSettings.defaultChannelProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) @@ -512,13 +539,22 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { return this; } + /** + * Gives the ability to set a custom executor for managing lease extensions. If none is + * provided a shared one will be used by all {@link Subscriber} instances. + */ + public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) { + this.alarmsExecutorProvider = Preconditions.checkNotNull(executorProvider); + return this; + } + /** Gives the ability to set a custom clock. */ Builder setClock(ApiClock clock) { this.clock = Optional.of(clock); return this; } - public Subscriber build() throws IOException { + public Subscriber build() { return new Subscriber(this); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java similarity index 99% rename from google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java rename to google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java index e8479f899cd9..930981779722 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java @@ -59,7 +59,7 @@ /** Tests for {@link Subscriber}. */ @RunWith(Parameterized.class) -public class SubscriberImplTest { +public class SubscriberTest { private static final SubscriptionName TEST_SUBSCRIPTION = SubscriptionName.create("test-project", "test-subscription"); @@ -166,7 +166,7 @@ private void replyTo(AckReplyConsumer reply) { } } - public SubscriberImplTest(boolean streamingTest) { + public SubscriberTest(boolean streamingTest) { this.isStreamingTest = streamingTest; } @@ -536,6 +536,7 @@ private void sendMessages(Iterable ackIds) throws InterruptedException { private Builder getTestSubscriberBuilder(MessageReceiver receiver) { return Subscriber.defaultBuilder(TEST_SUBSCRIPTION, receiver) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) + .setLeaseAlarmsExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider(FixedChannelProvider.create(testChannel)) .setClock(fakeExecutor.getClock()); }