From 45037c2c4722dd619cbdde52a7cad0c3328f93f3 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 14 Mar 2018 17:43:15 +1100 Subject: [PATCH] pubsub: remove polling implementation There has been no way to use it since Aug 2017 and removing it will make moving the GAPIC stub easier. --- .../v1/PollingSubscriberConnection.java | 224 ------------------ .../google/cloud/pubsub/v1/Subscriber.java | 79 +----- .../cloud/pubsub/v1/SubscriberTest.java | 2 +- 3 files changed, 2 insertions(+), 303 deletions(-) delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java deleted file mode 100644 index 53f808ab77bf..000000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PollingSubscriberConnection.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright 2016 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import com.google.api.core.AbstractApiService; -import com.google.api.core.ApiClock; -import com.google.api.gax.batching.FlowController; -import com.google.api.gax.core.Distribution; -import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor; -import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline; -import com.google.common.collect.Lists; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.pubsub.v1.AcknowledgeRequest; -import com.google.pubsub.v1.ModifyAckDeadlineRequest; -import com.google.pubsub.v1.PullRequest; -import com.google.pubsub.v1.PullResponse; -import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub; -import com.google.pubsub.v1.Subscription; -import java.util.Deque; -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; -import org.threeten.bp.Duration; - -/** - * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations. - */ -final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor { - static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60); - - private static final int MAX_PER_REQUEST_CHANGES = 1000; - private static final int DEFAULT_MAX_MESSAGES = 1000; - private static final Duration INITIAL_BACKOFF = Duration.ofMillis(100); // 100ms - private static final Duration MAX_BACKOFF = Duration.ofSeconds(10); // 10s - - private static final Logger logger = - Logger.getLogger(PollingSubscriberConnection.class.getName()); - - private final Subscription subscription; - private final ScheduledExecutorService pollingExecutor; - private final SubscriberFutureStub stub; - private final MessageDispatcher messageDispatcher; - private final int maxDesiredPulledMessages; - - public PollingSubscriberConnection( - Subscription subscription, - MessageReceiver receiver, - Duration ackExpirationPadding, - Duration maxAckExtensionPeriod, - Distribution ackLatencyDistribution, - SubscriberFutureStub stub, - FlowController flowController, - @Nullable Long maxDesiredPulledMessages, - Deque outstandingMessageBatches, - ScheduledExecutorService executor, - ScheduledExecutorService systemExecutor, - ApiClock clock) { - this.subscription = subscription; - this.pollingExecutor = systemExecutor; - this.stub = stub; - messageDispatcher = - new MessageDispatcher( - receiver, - this, - ackExpirationPadding, - maxAckExtensionPeriod, - ackLatencyDistribution, - flowController, - outstandingMessageBatches, - executor, - systemExecutor, - clock); - this.maxDesiredPulledMessages = - maxDesiredPulledMessages != null - ? Ints.saturatedCast(maxDesiredPulledMessages) - : DEFAULT_MAX_MESSAGES; - } - - @Override - protected void doStart() { - logger.config("Starting subscriber."); - messageDispatcher.start(); - pullMessages(INITIAL_BACKOFF); - notifyStarted(); - } - - @Override - protected void doStop() { - messageDispatcher.stop(); - notifyStopped(); - } - - private ListenableFuture pullMessages(final Duration backoff) { - if (!isAlive()) { - return Futures.immediateCancelledFuture(); - } - ListenableFuture pullResult = - stub.pull( - PullRequest.newBuilder() - .setSubscription(subscription.getName()) - .setMaxMessages(maxDesiredPulledMessages) - .setReturnImmediately(false) - .build()); - - Futures.addCallback( - pullResult, - new FutureCallback() { - @Override - public void onSuccess(PullResponse pullResponse) { - if (pullResponse.getReceivedMessagesCount() == 0) { - // No messages in response, possibly caught up in backlog, we backoff to avoid - // slamming the server. - pollingExecutor.schedule( - new Runnable() { - @Override - public void run() { - Duration newBackoff = backoff.multipliedBy(2); - if (newBackoff.compareTo(MAX_BACKOFF) > 0) { - newBackoff = MAX_BACKOFF; - } - pullMessages(newBackoff); - } - }, - backoff.toMillis(), - TimeUnit.MILLISECONDS); - return; - } - messageDispatcher.processReceivedMessages( - pullResponse.getReceivedMessagesList(), - new Runnable() { - @Override - public void run() { - pullMessages(INITIAL_BACKOFF); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - if (!isAlive()) { - // we don't care about subscription failures when we're no longer running. - logger.log(Level.FINE, "pull failure after service no longer running", cause); - return; - } - if (StatusUtil.isRetryable(cause)) { - logger.log(Level.WARNING, "Failed to pull messages (recoverable): ", cause); - pollingExecutor.schedule( - new Runnable() { - @Override - public void run() { - Duration newBackoff = backoff.multipliedBy(2); - if (newBackoff.compareTo(MAX_BACKOFF) > 0) { - newBackoff = MAX_BACKOFF; - } - pullMessages(newBackoff); - } - }, - backoff.toMillis(), - TimeUnit.MILLISECONDS); - } else { - messageDispatcher.stop(); - notifyFailed(cause); - } - } - }, - pollingExecutor); - - return pullResult; - } - - private boolean isAlive() { - // Read state only once. Because of threading, different calls can give different results. - State state = state(); - return state == State.RUNNING || state == State.STARTING; - } - - @Override - public void sendAckOperations( - List acksToSend, List ackDeadlineExtensions) { - // Send the modify ack deadlines in batches as not to exceed the max request - // size. - for (PendingModifyAckDeadline modifyAckDeadline : ackDeadlineExtensions) { - for (List ackIdChunk : - Lists.partition(modifyAckDeadline.ackIds, MAX_PER_REQUEST_CHANGES)) { - stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) - .modifyAckDeadline( - ModifyAckDeadlineRequest.newBuilder() - .setSubscription(subscription.getName()) - .addAllAckIds(ackIdChunk) - .setAckDeadlineSeconds(modifyAckDeadline.deadlineExtensionSeconds) - .build()); - } - } - - for (List ackChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) { - stub.withDeadlineAfter(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) - .acknowledge( - AcknowledgeRequest.newBuilder() - .setSubscription(subscription.getName()) - .addAllAckIds(ackChunk) - .build()); - } - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 610885273058..9520b2f52893 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -122,12 +122,10 @@ public class Subscriber extends AbstractApiService { private final List channels; private final MessageReceiver receiver; private final List streamingSubscriberConnections; - private final List pollingSubscriberConnections; private final Deque outstandingMessageBatches = new LinkedList<>(); private final ApiClock clock; private final List closeables = new ArrayList<>(); - private final boolean useStreaming; private ScheduledFuture ackDeadlineUpdater; private Subscriber(Builder builder) { @@ -195,8 +193,6 @@ public void close() throws IOException { numChannels = builder.parallelPullCount; channels = new ArrayList<>(numChannels); streamingSubscriberConnections = new ArrayList(numChannels); - pollingSubscriberConnections = new ArrayList(numChannels); - useStreaming = builder.useStreaming; } /** @@ -310,11 +306,7 @@ protected void doStart() { @Override public void run() { try { - if (useStreaming) { startStreamingConnections(); - } else { - startPollingConnections(); - } notifyStarted(); } catch (Throwable t) { notifyFailed(t); @@ -327,7 +319,6 @@ public void run() { @Override protected void doStop() { // stop connection is no-op if connections haven't been started. - stopAllPollingConnections(); stopAllStreamingConnections(); try { for (AutoCloseable closeable : closeables) { @@ -339,64 +330,6 @@ protected void doStop() { } } - private void startPollingConnections() throws IOException { - synchronized (pollingSubscriberConnections) { - Credentials credentials = credentialsProvider.getCredentials(); - CallCredentials callCredentials = - credentials == null ? null : MoreCallCredentials.from(credentials); - - SubscriberGrpc.SubscriberBlockingStub getSubStub = - SubscriberGrpc.newBlockingStub(channels.get(0)) - .withDeadlineAfter( - PollingSubscriberConnection.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); - if (callCredentials != null) { - getSubStub = getSubStub.withCallCredentials(callCredentials); - } - Subscription subscriptionInfo = - getSubStub.getSubscription( - GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build()); - - for (Channel channel : channels) { - SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel); - if (callCredentials != null) { - stub = stub.withCallCredentials(callCredentials); - } - pollingSubscriberConnections.add( - new PollingSubscriberConnection( - subscriptionInfo, - receiver, - ackExpirationPadding, - maxAckExtensionPeriod, - ackLatencyDistribution, - stub, - flowController, - flowControlSettings.getMaxOutstandingElementCount(), - outstandingMessageBatches, - executor, - alarmsExecutor, - clock)); - } - startConnections( - pollingSubscriberConnections, - new Listener() { - @Override - public void failed(State from, Throwable failure) { - // If a connection failed is because of a fatal error, we should fail the - // whole subscriber. - stopAllPollingConnections(); - try { - notifyFailed(failure); - } catch (IllegalStateException e) { - if (isRunning()) { - throw e; - } - // It could happen that we are shutting down while some channels fail. - } - } - }); - } - } - private void startStreamingConnections() throws IOException { synchronized (streamingSubscriberConnections) { Credentials credentials = credentialsProvider.getCredentials(); @@ -443,10 +376,6 @@ public void failed(State from, Throwable failure) { } } - private void stopAllPollingConnections() { - stopConnections(pollingSubscriberConnections); - } - private void stopAllStreamingConnections() { stopConnections(streamingSubscriberConnections); if (ackDeadlineUpdater != null) { @@ -525,7 +454,6 @@ public static final class Builder { CredentialsProvider credentialsProvider = SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build(); Optional clock = Optional.absent(); - boolean useStreaming = true; int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE; Builder(String subscriptionName, MessageReceiver receiver) { @@ -630,7 +558,7 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { } /** - * Gives the ability to set a custom executor for polling and managing lease extensions. If none + * Gives the ability to set a custom executor for managing lease extensions. If none * is provided a shared one will be used by all {@link Subscriber} instances. */ public Builder setSystemExecutorProvider(ExecutorProvider executorProvider) { @@ -653,11 +581,6 @@ Builder setClock(ApiClock clock) { return this; } - Builder setUseStreaming(boolean useStreaming) { - this.useStreaming = useStreaming; - return this; - } - public Subscriber build() { return new Subscriber(this); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 4ee88f27464e..2664a4a4163a 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -141,7 +141,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception { } private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception { - Subscriber subscriber = testSubscriberBuilder.setUseStreaming(true).build(); + Subscriber subscriber = testSubscriberBuilder.build(); subscriber.startAsync().awaitRunning(); return subscriber; }