diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java
index ff71bafd9619..d79e4e9f8afd 100644
--- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java
+++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java
@@ -624,7 +624,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.addListener(
- new Subscriber.SubscriberListener() {
+ new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure.
diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java
index d61d8f2c6bbc..563f270ec6dc 100644
--- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java
+++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java
@@ -53,7 +53,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
try {
subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(
- new Subscriber.SubscriberListener() {
+ new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java
index d2340d2a0834..c65f416df3d1 100644
--- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java
+++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java
@@ -54,7 +54,7 @@ public SubscriberSnippets(
public void startAndWait() throws Exception {
// [START startAsync]
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
- subscriber.addListener(new Subscriber.SubscriberListener() {
+ subscriber.addListener(new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
// Handle error.
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java
index 91acca40cf08..5f979490979a 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java
@@ -16,14 +16,14 @@
package com.google.cloud.pubsub.spi.v1;
-import com.google.api.gax.core.FlowController;
+import com.google.api.gax.core.AbstractApiService;
import com.google.api.gax.core.ApiClock;
+import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -45,10 +45,9 @@
import org.joda.time.Duration;
/**
- * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and
- * acknowledge operations.
+ * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations.
*/
-final class PollingSubscriberConnection extends AbstractService implements AckProcessor {
+final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor {
private static final int MAX_PER_REQUEST_CHANGES = 1000;
private static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds(10);
private static final int DEFAULT_MAX_MESSAGES = 1000;
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java
index 1bb06cf9b341..a85fad6b0cce 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java
@@ -16,15 +16,15 @@
package com.google.cloud.pubsub.spi.v1;
-import com.google.api.gax.core.FlowController;
+import com.google.api.gax.core.AbstractApiService;
import com.google.api.gax.core.ApiClock;
+import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
@@ -48,7 +48,7 @@
import org.joda.time.Duration;
/** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */
-final class StreamingSubscriberConnection extends AbstractService implements AckProcessor {
+final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor {
private static final Logger logger =
Logger.getLogger(StreamingSubscriberConnection.class.getName());
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
index c44260b56b57..2ffad91dd734 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java
@@ -16,10 +16,12 @@
package com.google.cloud.pubsub.spi.v1;
+import com.google.api.gax.core.AbstractApiService;
+import com.google.api.gax.core.ApiClock;
+import com.google.api.gax.core.ApiService;
import com.google.api.gax.core.CurrentMillisClock;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
-import com.google.api.gax.core.ApiClock;;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.stats.Distribution;
@@ -29,8 +31,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.Service;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
@@ -42,11 +42,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.joda.time.Duration;
@@ -77,7 +75,7 @@
*
If no credentials are provided, the {@link Subscriber} will use application default
* credentials through {@link GoogleCredentials#getApplicationDefault}.
*/
-public class Subscriber {
+public class Subscriber extends AbstractApiService {
private static final int THREADS_PER_CHANNEL = 5;
@VisibleForTesting static final int CHANNELS_PER_CORE = 10;
private static final int MAX_INBOUND_MESSAGE_SIZE =
@@ -88,10 +86,73 @@ public class Subscriber {
private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1);
private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
- private final SubscriberImpl impl;
+ private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
+
+ private final SubscriptionName subscriptionName;
+ private final String cachedSubscriptionNameString;
+ private final FlowControlSettings flowControlSettings;
+ private final Duration ackExpirationPadding;
+ private final ScheduledExecutorService executor;
+ private final Distribution ackLatencyDistribution =
+ new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
+ private final int numChannels;
+ private final FlowController flowController;
+ private final ManagedChannelBuilder extends ManagedChannelBuilder>> channelBuilder;
+ private final Credentials credentials;
+ private final MessageReceiver receiver;
+ private final List streamingSubscriberConnections;
+ private final List pollingSubscriberConnections;
+ private final ApiClock clock;
+ private final List closeables = new ArrayList<>();
+ private ScheduledFuture> ackDeadlineUpdater;
+ private int streamAckDeadlineSeconds;
private Subscriber(Builder builder) throws IOException {
- impl = new SubscriberImpl(builder);
+ receiver = builder.receiver;
+ flowControlSettings = builder.flowControlSettings;
+ subscriptionName = builder.subscriptionName;
+ cachedSubscriptionNameString = subscriptionName.toString();
+ ackExpirationPadding = builder.ackExpirationPadding;
+ streamAckDeadlineSeconds =
+ Math.max(
+ INITIAL_ACK_DEADLINE_SECONDS,
+ Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
+ clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();
+
+ flowController = new FlowController(builder.flowControlSettings);
+
+ executor = builder.executorProvider.getExecutor();
+ if (builder.executorProvider.shouldAutoClose()) {
+ closeables.add(
+ new AutoCloseable() {
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+ });
+ }
+
+ channelBuilder =
+ builder.channelBuilder.isPresent()
+ ? builder.channelBuilder.get()
+ : NettyChannelBuilder.forAddress(
+ SubscriptionAdminSettings.getDefaultServiceAddress(),
+ SubscriptionAdminSettings.getDefaultServicePort())
+ .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+ .flowControlWindow(5000000) // 2.5 MB
+ .negotiationType(NegotiationType.TLS)
+ .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
+ .executor(executor);
+
+ credentials =
+ builder.credentials.isPresent()
+ ? builder.credentials.get()
+ : GoogleCredentials.getApplicationDefault()
+ .createScoped(SubscriptionAdminSettings.getDefaultServiceScopes());
+
+ numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
+ streamingSubscriberConnections = new ArrayList(numChannels);
+ pollingSubscriberConnections = new ArrayList(numChannels);
}
/**
@@ -110,72 +171,17 @@ public static Builder newBuilder(SubscriptionName subscription, MessageReceiver
/** Subscription which the subscriber is subscribed to. */
public SubscriptionName getSubscriptionName() {
- return impl.subscriptionName;
+ return subscriptionName;
}
/** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */
public Duration getAckExpirationPadding() {
- return impl.ackExpirationPadding;
+ return ackExpirationPadding;
}
/** The flow control settings the Subscriber is configured with. */
public FlowControlSettings getFlowControlSettings() {
- return impl.flowControlSettings;
- }
-
- public void addListener(final SubscriberListener listener, Executor executor) {
- impl.addListener(
- new Service.Listener() {
- @Override
- public void failed(Service.State from, Throwable failure) {
- listener.failed(convertState(from), failure);
- }
-
- @Override
- public void running() {
- listener.running();
- }
-
- @Override
- public void starting() {
- listener.starting();
- }
-
- @Override
- public void stopping(Service.State from) {
- listener.stopping(convertState(from));
- }
-
- @Override
- public void terminated(Service.State from) {
- listener.terminated(convertState(from));
- }
- },
- executor);
- }
-
- public void awaitRunning() {
- impl.awaitRunning();
- }
-
- public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
- impl.awaitRunning(timeout, unit);
- }
-
- public void awaitTerminated() {
- impl.awaitTerminated();
- }
-
- public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
- impl.awaitTerminated(timeout, unit);
- }
-
- public Throwable failureCause() {
- return impl.failureCause();
- }
-
- public boolean isRunning() {
- return impl.isRunning();
+ return flowControlSettings;
}
/**
@@ -184,7 +190,7 @@ public boolean isRunning() {
* Example of receiving a specific number of messages.
*
{@code
* Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
- * subscriber.addListener(new Subscriber.SubscriberListener() {
+ * subscriber.addListener(new Subscriber.Listener() {
* public void failed(Subscriber.State from, Throwable failure) {
* // Handle error.
* }
@@ -197,319 +203,201 @@ public boolean isRunning() {
* }
*
*/
- public Subscriber startAsync() {
- impl.startAsync();
- return this;
- }
-
- public State state() {
- return convertState(impl.state());
+ @Override
+ public ApiService startAsync() {
+ // Override only for the docs.
+ return super.startAsync();
}
- private State convertState(Service.State state) {
- switch (state) {
- case FAILED:
- return State.FAILED;
- case NEW:
- return State.NEW;
- case RUNNING:
- return State.RUNNING;
- case STARTING:
- return State.STARTING;
- case STOPPING:
- return State.STOPPING;
- case TERMINATED:
- return State.TERMINATED;
- }
- throw new IllegalStateException("unknown state: " + state);
+ @Override
+ protected void doStart() {
+ logger.log(Level.FINE, "Starting subscriber group.");
+ // Streaming pull is not enabled on the service yet.
+ // startStreamingConnections();
+ startPollingConnections();
+ notifyStarted();
}
- public Subscriber stopAsync() {
- impl.stopAsync();
- return this;
- }
-
- public enum State {
- FAILED,
- NEW,
- RUNNING,
- STARTING,
- STOPPING,
- TERMINATED
- }
-
- public abstract static class SubscriberListener {
- public void failed(Subscriber.State from, Throwable failure) {}
-
- public void running() {}
-
- public void starting() {}
-
- public void stopping(State from) {}
-
- public void terminated(State from) {}
- }
-
- private static class SubscriberImpl extends AbstractService {
- private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
-
- private final SubscriptionName subscriptionName;
- private final String cachedSubscriptionNameString;
- private final FlowControlSettings flowControlSettings;
- private final Duration ackExpirationPadding;
- private final ScheduledExecutorService executor;
- private final Distribution ackLatencyDistribution =
- new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
- private final int numChannels;
- private final FlowController flowController;
- private final ManagedChannelBuilder extends ManagedChannelBuilder>> channelBuilder;
- private final Credentials credentials;
- private final MessageReceiver receiver;
- private final List streamingSubscriberConnections;
- private final List pollingSubscriberConnections;
- private final ApiClock clock;
- private final List closeables = new ArrayList<>();
- private ScheduledFuture> ackDeadlineUpdater;
- private int streamAckDeadlineSeconds;
-
- private SubscriberImpl(Builder builder) throws IOException {
- receiver = builder.receiver;
- flowControlSettings = builder.flowControlSettings;
- subscriptionName = builder.subscriptionName;
- cachedSubscriptionNameString = subscriptionName.toString();
- ackExpirationPadding = builder.ackExpirationPadding;
- streamAckDeadlineSeconds =
- Math.max(
- INITIAL_ACK_DEADLINE_SECONDS,
- Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
- clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();
-
- flowController = new FlowController(builder.flowControlSettings);
-
- executor = builder.executorProvider.getExecutor();
- if (builder.executorProvider.shouldAutoClose()) {
- closeables.add(
- new AutoCloseable() {
- @Override
- public void close() throws IOException {
- executor.shutdown();
- }
- });
+ @Override
+ protected void doStop() {
+ stopAllStreamingConnections();
+ stopAllPollingConnections();
+ try {
+ for (AutoCloseable closeable : closeables) {
+ closeable.close();
}
-
- channelBuilder =
- builder.channelBuilder.isPresent()
- ? builder.channelBuilder.get()
- : NettyChannelBuilder.forAddress(
- SubscriptionAdminSettings.getDefaultServiceAddress(),
- SubscriptionAdminSettings.getDefaultServicePort())
- .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
- .flowControlWindow(5000000) // 2.5 MB
- .negotiationType(NegotiationType.TLS)
- .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
- .executor(executor);
-
- credentials =
- builder.credentials.isPresent()
- ? builder.credentials.get()
- : GoogleCredentials.getApplicationDefault()
- .createScoped(SubscriptionAdminSettings.getDefaultServiceScopes());
-
- numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
- streamingSubscriberConnections = new ArrayList(numChannels);
- pollingSubscriberConnections = new ArrayList(numChannels);
- }
-
- @Override
- protected void doStart() {
- logger.log(Level.FINE, "Starting subscriber group.");
- // Streaming pull is not enabled on the service yet.
- // startStreamingConnections();
- startPollingConnections();
- notifyStarted();
+ notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
}
+ }
- @Override
- protected void doStop() {
- stopAllStreamingConnections();
- stopAllPollingConnections();
- try {
- for (AutoCloseable closeable : closeables) {
- closeable.close();
- }
- notifyStopped();
- } catch (Exception e) {
- notifyFailed(e);
+ private void startStreamingConnections() {
+ synchronized (streamingSubscriberConnections) {
+ for (int i = 0; i < numChannels; i++) {
+ streamingSubscriberConnections.add(
+ new StreamingSubscriberConnection(
+ cachedSubscriptionNameString,
+ credentials,
+ receiver,
+ ackExpirationPadding,
+ streamAckDeadlineSeconds,
+ ackLatencyDistribution,
+ channelBuilder.build(),
+ flowController,
+ executor,
+ clock));
}
+ startConnections(
+ streamingSubscriberConnections,
+ new Listener() {
+ @Override
+ public void failed(State from, Throwable failure) {
+ // If a connection failed is because of a fatal error, we should fail the
+ // whole subscriber.
+ stopAllStreamingConnections();
+ if (failure instanceof StatusRuntimeException
+ && ((StatusRuntimeException) failure).getStatus().getCode()
+ == Status.Code.UNIMPLEMENTED) {
+ logger.info("Unable to open streaming connections, falling back to polling.");
+ startPollingConnections();
+ return;
+ }
+ notifyFailed(failure);
+ }
+ });
}
- private void startStreamingConnections() {
- synchronized (streamingSubscriberConnections) {
- for (int i = 0; i < numChannels; i++) {
- streamingSubscriberConnections.add(
- new StreamingSubscriberConnection(
- cachedSubscriptionNameString,
- credentials,
- receiver,
- ackExpirationPadding,
- streamAckDeadlineSeconds,
- ackLatencyDistribution,
- channelBuilder.build(),
- flowController,
- executor,
- clock));
- }
- startConnections(
- streamingSubscriberConnections,
- new Listener() {
+ ackDeadlineUpdater =
+ executor.scheduleAtFixedRate(
+ new Runnable() {
@Override
- public void failed(State from, Throwable failure) {
- // If a connection failed is because of a fatal error, we should fail the
- // whole subscriber.
- stopAllStreamingConnections();
- if (failure instanceof StatusRuntimeException
- && ((StatusRuntimeException) failure).getStatus().getCode()
- == Status.Code.UNIMPLEMENTED) {
- logger.info("Unable to open streaming connections, falling back to polling.");
- startPollingConnections();
- return;
- }
- notifyFailed(failure);
- }
- });
- }
-
- ackDeadlineUpdater =
- executor.scheduleAtFixedRate(
- new Runnable() {
- @Override
- public void run() {
- // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
- long ackLatency =
- ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
- if (ackLatency > 0) {
- int possibleStreamAckDeadlineSeconds =
- Math.max(
- MIN_ACK_DEADLINE_SECONDS,
- Ints.saturatedCast(
- Math.max(ackLatency, ackExpirationPadding.getStandardSeconds())));
- if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
- streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
- logger.log(
- Level.FINER,
- "Updating stream deadline to {0} seconds.",
- streamAckDeadlineSeconds);
- for (StreamingSubscriberConnection subscriberConnection :
- streamingSubscriberConnections) {
- subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
- }
+ public void run() {
+ // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
+ long ackLatency =
+ ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
+ if (ackLatency > 0) {
+ int possibleStreamAckDeadlineSeconds =
+ Math.max(
+ MIN_ACK_DEADLINE_SECONDS,
+ Ints.saturatedCast(
+ Math.max(ackLatency, ackExpirationPadding.getStandardSeconds())));
+ if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
+ streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
+ logger.log(
+ Level.FINER,
+ "Updating stream deadline to {0} seconds.",
+ streamAckDeadlineSeconds);
+ for (StreamingSubscriberConnection subscriberConnection :
+ streamingSubscriberConnections) {
+ subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
}
}
}
- },
- ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
- ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
- TimeUnit.MILLISECONDS);
- }
+ }
+ },
+ ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
+ ACK_DEADLINE_UPDATE_PERIOD.getMillis(),
+ TimeUnit.MILLISECONDS);
+ }
- private void stopAllStreamingConnections() {
- stopConnections(streamingSubscriberConnections);
- if (ackDeadlineUpdater != null) {
- ackDeadlineUpdater.cancel(true);
- }
+ private void stopAllStreamingConnections() {
+ stopConnections(streamingSubscriberConnections);
+ if (ackDeadlineUpdater != null) {
+ ackDeadlineUpdater.cancel(true);
}
+ }
- private void startPollingConnections() {
- synchronized (pollingSubscriberConnections) {
- for (int i = 0; i < numChannels; i++) {
- pollingSubscriberConnections.add(
- new PollingSubscriberConnection(
- cachedSubscriptionNameString,
- credentials,
- receiver,
- ackExpirationPadding,
- ackLatencyDistribution,
- channelBuilder.build(),
- flowController,
- executor,
- clock));
- }
- startConnections(
- pollingSubscriberConnections,
- new Listener() {
- @Override
- public void failed(State from, Throwable failure) {
- // If a connection failed is because of a fatal error, we should fail the
- // whole subscriber.
- stopAllPollingConnections();
- try {
- notifyFailed(failure);
- } catch (IllegalStateException e) {
- if (isRunning()) {
- throw e;
- }
- // It could happen that we are shutting down while some channels fail.
+ private void startPollingConnections() {
+ synchronized (pollingSubscriberConnections) {
+ for (int i = 0; i < numChannels; i++) {
+ pollingSubscriberConnections.add(
+ new PollingSubscriberConnection(
+ cachedSubscriptionNameString,
+ credentials,
+ receiver,
+ ackExpirationPadding,
+ ackLatencyDistribution,
+ channelBuilder.build(),
+ flowController,
+ executor,
+ clock));
+ }
+ startConnections(
+ pollingSubscriberConnections,
+ new Listener() {
+ @Override
+ public void failed(State from, Throwable failure) {
+ // If a connection failed is because of a fatal error, we should fail the
+ // whole subscriber.
+ stopAllPollingConnections();
+ try {
+ notifyFailed(failure);
+ } catch (IllegalStateException e) {
+ if (isRunning()) {
+ throw e;
}
+ // It could happen that we are shutting down while some channels fail.
}
- });
- }
+ }
+ });
}
+ }
- private void stopAllPollingConnections() {
- stopConnections(pollingSubscriberConnections);
- }
+ private void stopAllPollingConnections() {
+ stopConnections(pollingSubscriberConnections);
+ }
- private void startConnections(
- List extends Service> connections, final Listener connectionsListener) {
- final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
- for (final Service subscriber : connections) {
- executor.submit(
- new Runnable() {
- @Override
- public void run() {
- subscriber.addListener(connectionsListener, executor);
- try {
- subscriber.startAsync().awaitRunning();
- } finally {
- subscribersStarting.countDown();
- }
+ private void startConnections(
+ List extends ApiService> connections, final ApiService.Listener connectionsListener) {
+ final CountDownLatch subscribersStarting = new CountDownLatch(numChannels);
+ for (final ApiService subscriber : connections) {
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ subscriber.addListener(connectionsListener, executor);
+ try {
+ subscriber.startAsync().awaitRunning();
+ } finally {
+ subscribersStarting.countDown();
}
- });
- }
- try {
- subscribersStarting.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ }
+ });
}
+ try {
+ subscribersStarting.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
- private void stopConnections(List extends Service> connections) {
- ArrayList liveConnections;
- synchronized (connections) {
- liveConnections = new ArrayList(connections);
- connections.clear();
- }
- final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
- for (final Service subscriberConnection : liveConnections) {
- executor.submit(
- new Runnable() {
- @Override
- public void run() {
- try {
- subscriberConnection.stopAsync().awaitTerminated();
- } catch (IllegalStateException ignored) {
- // It is expected for some connections to be already in state failed so stop will
- // throw this expection.
- }
- connectionsStopping.countDown();
+ private void stopConnections(List extends ApiService> connections) {
+ ArrayList liveConnections;
+ synchronized (connections) {
+ liveConnections = new ArrayList(connections);
+ connections.clear();
+ }
+ final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size());
+ for (final ApiService subscriberConnection : liveConnections) {
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ subscriberConnection.stopAsync().awaitTerminated();
+ } catch (IllegalStateException ignored) {
+ // It is expected for some connections to be already in state failed so stop will
+ // throw this expection.
}
- });
- }
- try {
- connectionsStopping.await();
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
+ connectionsStopping.countDown();
+ }
+ });
+ }
+ try {
+ connectionsStopping.await();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
index 2b09914b535d..b1f657bedbd7 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
@@ -25,9 +25,9 @@
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Publisher;
-import com.google.cloud.pubsub.spi.v1.TopicAdminClient;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.spi.v1.TopicAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.Binding;
import com.google.iam.v1.Policy;
@@ -124,7 +124,7 @@ public void receiveMessage(
})
.build();
subscriber.addListener(
- new Subscriber.SubscriberListener() {
+ new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
received.setException(failure);
}