From 4d990aa3e71bc81e0861721225d6fb7c8be2cc6e Mon Sep 17 00:00:00 2001 From: David Torres Date: Fri, 7 Apr 2017 13:10:22 -0400 Subject: [PATCH 1/4] Decouple the processing of new received messages from the dispatching to user code - Separated the handling of batched received messages from the per-message dispatching of message to the user code. In order to accomplish this I had to keep the batch in memory while the processing of each message will draw from the in-memory batch until completely depleted. - Drawing flow controller permits on a per message basis (used to try to draw permits for the whole batch potentially deadlocking the whole subscriber), this addresses the deadlock condition raised in #1868 - No longer using blocking flow controller, instead pausing and resuming pulls/streamed-messages based on the flow controller feedback and when new permits become available. - A separate executor for alarms (2 threads in it has showed up to scale pretty well with many subscriber, given our ack operations are pretty lightweight) - Setting the maximum of messages to pull per request based on the number requested by the user in the flow controller (if any), this in a best effort addresses #1868 Fixes #1868, fixes #1865 and fixes #1855 --- .../pubsub/spi/v1/MessageDispatcher.java | 178 +++++++++++++----- .../spi/v1/PollingSubscriberConnection.java | 23 ++- .../spi/v1/StreamingSubscriberConnection.java | 16 +- .../cloud/pubsub/spi/v1/Subscriber.java | 16 +- ...riberImplTest.java => SubscriberTest.java} | 4 +- 5 files changed, 173 insertions(+), 64 deletions(-) rename google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/{SubscriberImplTest.java => SubscriberTest.java} (99%) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index f89845f7e36e..0227f58e2209 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,9 +16,11 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.FlowController.FlowControlException; import com.google.api.stats.Distribution; +import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; @@ -29,11 +31,13 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -56,6 +60,9 @@ class MessageDispatcher { private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2; @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m + + private static final ScheduledExecutorService alarmsExecutor = + Executors.newScheduledThreadPool(2); private final ScheduledExecutorService executor; private final ApiClock clock; @@ -190,6 +197,7 @@ public void onFailure(Throwable t) { setupPendingAcksAlarm(); flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } @Override @@ -200,25 +208,23 @@ public void onSuccess(AckReply reply) { synchronized (pendingAcks) { pendingAcks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); // Record the latency rounded to the next closest integer. ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTime.getMillis()) / 1000D))); - messagesWaiter.incrementPendingMessages(-1); - return; + break; case NACK: synchronized (pendingNacks) { pendingNacks.add(ackId); } - setupPendingAcksAlarm(); - flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); - return; + break; default: throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply)); } + setupPendingAcksAlarm(); + flowController.release(1, outstandingBytes); + messagesWaiter.incrementPendingMessages(-1); + processOutstandingBatches(); } } @@ -275,28 +281,117 @@ public int getMessageDeadlineSeconds() { return messageDeadlineSeconds; } - public void processReceivedMessages(List responseMessages) { - int receivedMessagesCount = responseMessages.size(); - if (receivedMessagesCount == 0) { + static class OutstandingMessagesBatch { + static class OutstandingMessage { + private final com.google.pubsub.v1.ReceivedMessage receivedMessage; + private final AckHandler ackHandler; + + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; + } + + public com.google.pubsub.v1.ReceivedMessage receivedMessage() { + return receivedMessage; + } + + public AckHandler ackHandler() { + return ackHandler; + } + } + + private final Deque messages; + private final Runnable doneCallback; + + public OutstandingMessagesBatch(Runnable doneCallback) { + this.messages = new LinkedList<>(); + this.doneCallback = doneCallback; + } + + public void addMessage( + com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); + } + + public Deque messages() { + return messages; + } + + public void done() { + doneCallback.run(); + } + } + + Deque outstandingMessageBatches = new LinkedList<>(); + + public void processReceivedMessages( + List messages, Runnable doneCallback) { + if (messages.size() == 0) { + doneCallback.run(); return; } - Instant now = new Instant(clock.millisTime()); - int totalByteCount = 0; - final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); - for (ReceivedMessage pubsubMessage : responseMessages) { - int messageSize = pubsubMessage.getMessage().getSerializedSize(); - totalByteCount += messageSize; - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); + messagesWaiter.incrementPendingMessages(messages.size()); + + OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback); + final ArrayList ackHandlers = new ArrayList<>(messages.size()); + for (ReceivedMessage message : messages) { + AckHandler ackHandler = + new AckHandler(message.getAckId(), message.getMessage().getSerializedSize()); + ackHandlers.add(ackHandler); + outstandingBatch.addMessage(message, ackHandler); } - Instant expiration = now.plus(messageDeadlineSeconds * 1000); - logger.log( - Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); - - messagesWaiter.incrementPendingMessages(responseMessages.size()); - Iterator acksIterator = ackHandlers.iterator(); - for (ReceivedMessage userMessage : responseMessages) { - final PubsubMessage message = userMessage.getMessage(); - final AckHandler ackHandler = acksIterator.next(); + + Instant expiration = new Instant(clock.millisTime()).plus(messageDeadlineSeconds * 1000); + synchronized (outstandingAckHandlers) { + outstandingAckHandlers.add( + new ExtensionJob( + new Instant(clock.millisTime()), + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + ackHandlers)); + } + setupNextAckDeadlineExtensionAlarm(expiration); + + synchronized (outstandingMessageBatches) { + outstandingMessageBatches.add(outstandingBatch); + } + processOutstandingBatches(); + } + + public void processOutstandingBatches() { + while (true) { + boolean batchDone = false; + Runnable batchCallback = null; + OutstandingMessage outstandingMessage; + synchronized (outstandingMessageBatches) { + OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek(); + if (nextBatch == null) { + return; + } + outstandingMessage = nextBatch.messages.peek(); + if (outstandingMessage == null) { + return; + } + try { + // This is a non-blocking flow controller. + flowController.reserve( + 1, outstandingMessage.receivedMessage().getMessage().getSerializedSize()); + } catch (FlowController.MaxOutstandingElementCountReachedException + | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { + return; + } catch (FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + } + nextBatch.messages.poll(); // We got a hold to the message already. + batchDone = nextBatch.messages.isEmpty(); + if (batchDone) { + outstandingMessageBatches.poll(); + batchCallback = nextBatch.doneCallback; + } + } + + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); final SettableFuture response = SettableFuture.create(); final AckReplyConsumer consumer = new AckReplyConsumer() { @@ -322,22 +417,9 @@ public void run() { } } }); - } - - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob( - new Instant(clock.millisTime()), - expiration, - INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, - ackHandlers)); - } - setupNextAckDeadlineExtensionAlarm(expiration); - - try { - flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + if (batchDone) { + batchCallback.run(); + } } } @@ -346,7 +428,7 @@ private void setupPendingAcksAlarm() { try { if (pendingAcksAlarm == null) { pendingAcksAlarm = - executor.schedule( + alarmsExecutor.schedule( new Runnable() { @Override public void run() { @@ -411,7 +493,7 @@ public void run() { // drop it. continue; } - + // If a message has already been acked, remove it, nothing to do. for (int i = 0; i < job.ackHandlers.size(); ) { if (job.ackHandlers.get(i).acked.get()) { @@ -475,7 +557,7 @@ private void setupNextAckDeadlineExtensionAlarm(Instant expiration) { nextAckDeadlineExtensionAlarmTime = possibleNextAlarmTime; ackDeadlineExtensionAlarm = - executor.schedule( + alarmsExecutor.schedule( new AckDeadlineAlarm(), nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millisTime(), TimeUnit.MILLISECONDS); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 65c68f92c751..ea0741e7d57a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -59,6 +60,7 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac private final ScheduledExecutorService executor; private final SubscriberFutureStub stub; private final MessageDispatcher messageDispatcher; + private final int maxDesiredPulledMessages; public PollingSubscriberConnection( String subscription, @@ -68,6 +70,7 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, + @Nullable Integer maxDesiredPulledMessages, ScheduledExecutorService executor, ApiClock clock) { this.subscription = subscription; @@ -84,6 +87,8 @@ public PollingSubscriberConnection( executor, clock); messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + this.maxDesiredPulledMessages = + maxDesiredPulledMessages != null ? maxDesiredPulledMessages : DEFAULT_MAX_MESSAGES; } @Override @@ -112,7 +117,8 @@ public void onSuccess(Subscription result) { public void onFailure(Throwable cause) { notifyFailed(cause); } - }); + }, + executor); } @Override @@ -127,7 +133,7 @@ private void pullMessages(final Duration backoff) { .pull( PullRequest.newBuilder() .setSubscription(subscription) - .setMaxMessages(DEFAULT_MAX_MESSAGES) + .setMaxMessages(maxDesiredPulledMessages) .setReturnImmediately(true) .build()); @@ -136,7 +142,6 @@ private void pullMessages(final Duration backoff) { new FutureCallback() { @Override public void onSuccess(PullResponse pullResponse) { - messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList()); if (pullResponse.getReceivedMessagesCount() == 0) { // No messages in response, possibly caught up in backlog, we backoff to avoid // slamming the server. @@ -155,7 +160,14 @@ public void run() { TimeUnit.MILLISECONDS); return; } - pullMessages(INITIAL_BACKOFF); + messageDispatcher.processReceivedMessages( + pullResponse.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + pullMessages(INITIAL_BACKOFF); + } + }); } @Override @@ -185,7 +197,8 @@ public void run() { notifyFailed(cause); } } - }); + }, + executor); } private boolean isAlive() { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 5c0b7d451aa3..ffd4dc870d1c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -120,11 +120,17 @@ public void beforeStart(ClientCallStreamObserver requestOb @Override public void onNext(StreamingPullResponse response) { - messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); - // Only if not shutdown we will request one more batches of messages to be delivered. - if (isAlive()) { - requestObserver.request(1); - } + messageDispatcher.processReceivedMessages( + response.getReceivedMessagesList(), + new Runnable() { + @Override + public void run() { + // Only if not shutdown we will request one more batches of messages to be delivered. + if (isAlive()) { + requestObserver.request(1); + } + } + }); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 5438dda5e6f7..80b8f5962e56 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -23,6 +23,7 @@ import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController; import com.google.api.gax.grpc.ChannelProvider; +import com.google.api.gax.core.FlowController.LimitExceededBehavior; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; @@ -106,7 +107,7 @@ public class Subscriber extends AbstractApiService { private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; - private Subscriber(Builder builder) throws IOException { + private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; @@ -119,7 +120,13 @@ private Subscriber(Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); - flowController = new FlowController(builder.flowControlSettings); + flowController = + new FlowController( + builder + .flowControlSettings + .toBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .build()); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { @@ -328,6 +335,7 @@ private void startPollingConnections() { ackLatencyDistribution, channels.get(i), flowController, + flowControlSettings.getMaxOutstandingElementCount(), executor, clock)); } @@ -496,7 +504,7 @@ public Builder setAckExpirationPadding(Duration ackExpirationPadding) { * *

It is recommended to set this value to a reasonable upper bound of the subscriber time to * process any message. This maximum period avoids messages to be locked by a subscriber - * in cases when the {@link AckReply} is lost. + * in cases when the {@link AckReplyConsumer} is never called. * *

A zero duration effectively disables auto deadline extensions. */ @@ -518,7 +526,7 @@ Builder setClock(ApiClock clock) { return this; } - public Subscriber build() throws IOException { + public Subscriber build() { return new Subscriber(this); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java similarity index 99% rename from google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java rename to google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java index e8479f899cd9..4d41e64add39 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java @@ -59,7 +59,7 @@ /** Tests for {@link Subscriber}. */ @RunWith(Parameterized.class) -public class SubscriberImplTest { +public class SubscriberTest { private static final SubscriptionName TEST_SUBSCRIPTION = SubscriptionName.create("test-project", "test-subscription"); @@ -166,7 +166,7 @@ private void replyTo(AckReplyConsumer reply) { } } - public SubscriberImplTest(boolean streamingTest) { + public SubscriberTest(boolean streamingTest) { this.isStreamingTest = streamingTest; } From ef1b25a61732c9a5ee0ec349a3bffef15e4695eb Mon Sep 17 00:00:00 2001 From: davidtorres Date: Fri, 7 Apr 2017 15:40:47 -0400 Subject: [PATCH 2/4] Fix codacy complains --- .../pubsub/spi/v1/MessageDispatcher.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 0227f58e2209..9d7b3967dc43 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -60,7 +60,7 @@ class MessageDispatcher { private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2; @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m - + private static final ScheduledExecutorService alarmsExecutor = Executors.newScheduledThreadPool(2); @@ -85,6 +85,8 @@ class MessageDispatcher { private Instant nextAckDeadlineExtensionAlarmTime; private ScheduledFuture pendingAcksAlarm; + private Deque outstandingMessageBatches; + // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -248,6 +250,7 @@ void sendAckOperations( this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; + outstandingMessageBatches = new LinkedList<>(); outstandingAckHandlers = new PriorityQueue<>(); pendingAcks = new HashSet<>(); pendingNacks = new HashSet<>(); @@ -282,8 +285,11 @@ public int getMessageDeadlineSeconds() { } static class OutstandingMessagesBatch { + private final Deque messages; + private final Runnable doneCallback; + static class OutstandingMessage { - private final com.google.pubsub.v1.ReceivedMessage receivedMessage; + private final ReceivedMessage receivedMessage; private final AckHandler ackHandler; public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { @@ -291,7 +297,7 @@ public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler this.ackHandler = ackHandler; } - public com.google.pubsub.v1.ReceivedMessage receivedMessage() { + public ReceivedMessage receivedMessage() { return receivedMessage; } @@ -300,16 +306,12 @@ public AckHandler ackHandler() { } } - private final Deque messages; - private final Runnable doneCallback; - public OutstandingMessagesBatch(Runnable doneCallback) { this.messages = new LinkedList<>(); this.doneCallback = doneCallback; } - public void addMessage( - com.google.pubsub.v1.ReceivedMessage receivedMessage, AckHandler ackHandler) { + public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); } @@ -322,11 +324,8 @@ public void done() { } } - Deque outstandingMessageBatches = new LinkedList<>(); - - public void processReceivedMessages( - List messages, Runnable doneCallback) { - if (messages.size() == 0) { + public void processReceivedMessages(List messages, Runnable doneCallback) { + if (messages.isEmpty()) { doneCallback.run(); return; } From 7f0cdfd0837a94db1e0b9cf43327c310330fb8f6 Mon Sep 17 00:00:00 2001 From: davidtorres Date: Wed, 19 Apr 2017 22:20:54 -0400 Subject: [PATCH 3/4] Making the executor use deamon threads And some other minor changes addressing feedback --- .../google/cloud/pubsub/spi/v1/MessageDispatcher.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 9d7b3967dc43..9b7503e7f2a2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -19,6 +19,7 @@ import com.google.api.gax.core.ApiClock; import com.google.api.gax.core.FlowController; import com.google.api.gax.core.FlowController.FlowControlException; +import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; import com.google.cloud.pubsub.spi.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage; import com.google.common.annotations.VisibleForTesting; @@ -37,7 +38,6 @@ import java.util.List; import java.util.PriorityQueue; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -62,7 +62,7 @@ class MessageDispatcher { private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m private static final ScheduledExecutorService alarmsExecutor = - Executors.newScheduledThreadPool(2); + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor(); private final ScheduledExecutorService executor; private final ApiClock clock; @@ -85,7 +85,7 @@ class MessageDispatcher { private Instant nextAckDeadlineExtensionAlarmTime; private ScheduledFuture pendingAcksAlarm; - private Deque outstandingMessageBatches; + private final Deque outstandingMessageBatches; // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -318,10 +318,6 @@ public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { public Deque messages() { return messages; } - - public void done() { - doneCallback.run(); - } } public void processReceivedMessages(List messages, Runnable doneCallback) { From f71628089dc4cca28789b91beeb2ad90b555d418 Mon Sep 17 00:00:00 2001 From: davidtorres Date: Thu, 20 Apr 2017 22:54:04 -0400 Subject: [PATCH 4/4] Fixing merging issues as well as making the alarms executor configurable by the user, this allows for proper unit testing plus adds the ability to the user to override the executor. --- .../pubsub/spi/v1/MessageDispatcher.java | 8 ++++-- .../spi/v1/PollingSubscriberConnection.java | 2 ++ .../spi/v1/StreamingSubscriberConnection.java | 2 ++ .../cloud/pubsub/spi/v1/Subscriber.java | 28 +++++++++++++++++++ .../cloud/pubsub/spi/v1/SubscriberTest.java | 1 + 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 9b7503e7f2a2..5e8eb411b0ba 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,7 +16,7 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.ApiClock; +import com.google.api.core.ApiClock; import com.google.api.gax.core.FlowController; import com.google.api.gax.core.FlowController.FlowControlException; import com.google.api.gax.grpc.InstantiatingExecutorProvider; @@ -46,6 +46,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.Interval; @@ -61,10 +62,11 @@ class MessageDispatcher { @VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m - private static final ScheduledExecutorService alarmsExecutor = + private static final ScheduledExecutorService SHARED_ALARMS_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build().getExecutor(); private final ScheduledExecutorService executor; + private final ScheduledExecutorService alarmsExecutor; private final ApiClock clock; private final Duration ackExpirationPadding; @@ -243,8 +245,10 @@ void sendAckOperations( Distribution ackLatencyDistribution, FlowController flowController, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.executor = executor; + this.alarmsExecutor = alarmsExecutor == null ? SHARED_ALARMS_EXECUTOR : alarmsExecutor; this.ackExpirationPadding = ackExpirationPadding; this.maxAckExtensionPeriod = maxAckExtensionPeriod; this.receiver = receiver; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index b986374f3e4b..65bf82f2603f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -72,6 +72,7 @@ public PollingSubscriberConnection( FlowController flowController, @Nullable Integer maxDesiredPulledMessages, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.subscription = subscription; this.executor = executor; @@ -85,6 +86,7 @@ public PollingSubscriberConnection( ackLatencyDistribution, flowController, executor, + alarmsExecutor, clock); messageDispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); this.maxDesiredPulledMessages = diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 97227474ee94..b8cd7e042432 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -72,6 +72,7 @@ public StreamingSubscriberConnection( Channel channel, FlowController flowController, ScheduledExecutorService executor, + @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.subscription = subscription; this.executor = executor; @@ -85,6 +86,7 @@ public StreamingSubscriberConnection( ackLatencyDistribution, flowController, executor, + alarmsExecutor, clock); messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index f31f6a9e9f60..9d14c1bbeee3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; import org.joda.time.Duration; /** @@ -93,6 +94,7 @@ public class Subscriber extends AbstractApiService { private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; private final ScheduledExecutorService executor; + @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); private final int numChannels; @@ -138,6 +140,20 @@ public void close() throws IOException { } }); } + if (builder.alarmsExecutorProvider != null) { + alarmsExecutor = builder.alarmsExecutorProvider.getExecutor(); + if (builder.alarmsExecutorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() throws IOException { + alarmsExecutor.shutdown(); + } + }); + } + } else { + alarmsExecutor = null; + } channelProvider = builder.channelProvider; @@ -261,6 +277,7 @@ private void startStreamingConnections() { channels.get(i), flowController, executor, + alarmsExecutor, clock)); } startConnections( @@ -337,6 +354,7 @@ private void startPollingConnections() { flowController, flowControlSettings.getMaxOutstandingElementCount(), executor, + alarmsExecutor, clock)); } startConnections( @@ -441,6 +459,7 @@ public static final class Builder { FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; + @Nullable ExecutorProvider alarmsExecutorProvider; ChannelProvider channelProvider = SubscriptionAdminSettings.defaultChannelProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) @@ -520,6 +539,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { return this; } + /** + * Gives the ability to set a custom executor for managing lease extensions. If none is + * provided a shared one will be used by all {@link Subscriber} instances. + */ + public Builder setLeaseAlarmsExecutorProvider(ExecutorProvider executorProvider) { + this.alarmsExecutorProvider = Preconditions.checkNotNull(executorProvider); + return this; + } + /** Gives the ability to set a custom clock. */ Builder setClock(ApiClock clock) { this.clock = Optional.of(clock); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java index 4d41e64add39..930981779722 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java @@ -536,6 +536,7 @@ private void sendMessages(Iterable ackIds) throws InterruptedException { private Builder getTestSubscriberBuilder(MessageReceiver receiver) { return Subscriber.defaultBuilder(TEST_SUBSCRIPTION, receiver) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) + .setLeaseAlarmsExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider(FixedChannelProvider.create(testChannel)) .setClock(fakeExecutor.getClock()); }