From b9ed28096caab077282a398a8dde6fa34c8c7090 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 29 Mar 2017 13:07:32 +1100 Subject: [PATCH] DO NOT MERGE: pubsub: make Subscriber use ApiService Fixes #1761. Requires googleapis/gax-java#256, wait for merge and release before merging this. --- .../cloud/examples/pubsub/PubSubExample.java | 2 +- .../CreateSubscriptionAndPullMessages.java | 2 +- .../pubsub/snippets/SubscriberSnippets.java | 2 +- .../spi/v1/PollingSubscriberConnection.java | 9 +- .../spi/v1/StreamingSubscriberConnection.java | 6 +- .../cloud/pubsub/spi/v1/Subscriber.java | 596 +++++++----------- .../google/cloud/pubsub/it/ITPubSubTest.java | 4 +- pom.xml | 2 +- 8 files changed, 255 insertions(+), 368 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java index ff71bafd9619..d79e4e9f8afd 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/PubSubExample.java @@ -624,7 +624,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { try { subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); subscriber.addListener( - new Subscriber.SubscriberListener() { + new Subscriber.Listener() { @Override public void failed(Subscriber.State from, Throwable failure) { // Handle failure. diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java index d61d8f2c6bbc..563f270ec6dc 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateSubscriptionAndPullMessages.java @@ -53,7 +53,7 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { try { subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener( - new Subscriber.SubscriberListener() { + new Subscriber.Listener() { @Override public void failed(Subscriber.State from, Throwable failure) { // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down. diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index d2340d2a0834..c65f416df3d1 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -54,7 +54,7 @@ public SubscriberSnippets( public void startAndWait() throws Exception { // [START startAsync] Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); - subscriber.addListener(new Subscriber.SubscriberListener() { + subscriber.addListener(new Subscriber.Listener() { public void failed(Subscriber.State from, Throwable failure) { // Handle error. } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index 91acca40cf08..5f979490979a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -16,14 +16,14 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.AbstractApiService; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor; import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -45,10 +45,9 @@ import org.joda.time.Duration; /** - * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and - * acknowledge operations. + * Implementation of {@link AckProcessor} based on Cloud Pub/Sub pull and acknowledge operations. */ -final class PollingSubscriberConnection extends AbstractService implements AckProcessor { +final class PollingSubscriberConnection extends AbstractApiService implements AckProcessor { private static final int MAX_PER_REQUEST_CHANGES = 1000; private static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds(10); private static final int DEFAULT_MAX_MESSAGES = 1000; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 1bb06cf9b341..a85fad6b0cce 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -16,15 +16,15 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; +import com.google.api.gax.core.AbstractApiService; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor; import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; @@ -48,7 +48,7 @@ import org.joda.time.Duration; /** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */ -final class StreamingSubscriberConnection extends AbstractService implements AckProcessor { +final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor { private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index c44260b56b57..2ffad91dd734 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -16,10 +16,12 @@ package com.google.cloud.pubsub.spi.v1; +import com.google.api.gax.core.AbstractApiService; +import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.ApiService; import com.google.api.gax.core.CurrentMillisClock; import com.google.api.gax.core.FlowControlSettings; import com.google.api.gax.core.FlowController; -import com.google.api.gax.core.ApiClock;; import com.google.api.gax.grpc.ExecutorProvider; import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.api.stats.Distribution; @@ -29,8 +31,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.Service; import com.google.pubsub.v1.SubscriptionName; import io.grpc.ManagedChannelBuilder; import io.grpc.Status; @@ -42,11 +42,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import org.joda.time.Duration; @@ -77,7 +75,7 @@ *

If no credentials are provided, the {@link Subscriber} will use application default * credentials through {@link GoogleCredentials#getApplicationDefault}. */ -public class Subscriber { +public class Subscriber extends AbstractApiService { private static final int THREADS_PER_CHANNEL = 5; @VisibleForTesting static final int CHANNELS_PER_CORE = 10; private static final int MAX_INBOUND_MESSAGE_SIZE = @@ -88,10 +86,73 @@ public class Subscriber { private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1); private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; - private final SubscriberImpl impl; + private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); + + private final SubscriptionName subscriptionName; + private final String cachedSubscriptionNameString; + private final FlowControlSettings flowControlSettings; + private final Duration ackExpirationPadding; + private final ScheduledExecutorService executor; + private final Distribution ackLatencyDistribution = + new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); + private final int numChannels; + private final FlowController flowController; + private final ManagedChannelBuilder> channelBuilder; + private final Credentials credentials; + private final MessageReceiver receiver; + private final List streamingSubscriberConnections; + private final List pollingSubscriberConnections; + private final ApiClock clock; + private final List closeables = new ArrayList<>(); + private ScheduledFuture ackDeadlineUpdater; + private int streamAckDeadlineSeconds; private Subscriber(Builder builder) throws IOException { - impl = new SubscriberImpl(builder); + receiver = builder.receiver; + flowControlSettings = builder.flowControlSettings; + subscriptionName = builder.subscriptionName; + cachedSubscriptionNameString = subscriptionName.toString(); + ackExpirationPadding = builder.ackExpirationPadding; + streamAckDeadlineSeconds = + Math.max( + INITIAL_ACK_DEADLINE_SECONDS, + Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); + clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); + + flowController = new FlowController(builder.flowControlSettings); + + executor = builder.executorProvider.getExecutor(); + if (builder.executorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() throws IOException { + executor.shutdown(); + } + }); + } + + channelBuilder = + builder.channelBuilder.isPresent() + ? builder.channelBuilder.get() + : NettyChannelBuilder.forAddress( + SubscriptionAdminSettings.getDefaultServiceAddress(), + SubscriptionAdminSettings.getDefaultServicePort()) + .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .flowControlWindow(5000000) // 2.5 MB + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .executor(executor); + + credentials = + builder.credentials.isPresent() + ? builder.credentials.get() + : GoogleCredentials.getApplicationDefault() + .createScoped(SubscriptionAdminSettings.getDefaultServiceScopes()); + + numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; + streamingSubscriberConnections = new ArrayList(numChannels); + pollingSubscriberConnections = new ArrayList(numChannels); } /** @@ -110,72 +171,17 @@ public static Builder newBuilder(SubscriptionName subscription, MessageReceiver /** Subscription which the subscriber is subscribed to. */ public SubscriptionName getSubscriptionName() { - return impl.subscriptionName; + return subscriptionName; } /** Acknowledgement expiration padding. See {@link Builder#setAckExpirationPadding}. */ public Duration getAckExpirationPadding() { - return impl.ackExpirationPadding; + return ackExpirationPadding; } /** The flow control settings the Subscriber is configured with. */ public FlowControlSettings getFlowControlSettings() { - return impl.flowControlSettings; - } - - public void addListener(final SubscriberListener listener, Executor executor) { - impl.addListener( - new Service.Listener() { - @Override - public void failed(Service.State from, Throwable failure) { - listener.failed(convertState(from), failure); - } - - @Override - public void running() { - listener.running(); - } - - @Override - public void starting() { - listener.starting(); - } - - @Override - public void stopping(Service.State from) { - listener.stopping(convertState(from)); - } - - @Override - public void terminated(Service.State from) { - listener.terminated(convertState(from)); - } - }, - executor); - } - - public void awaitRunning() { - impl.awaitRunning(); - } - - public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { - impl.awaitRunning(timeout, unit); - } - - public void awaitTerminated() { - impl.awaitTerminated(); - } - - public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { - impl.awaitTerminated(timeout, unit); - } - - public Throwable failureCause() { - return impl.failureCause(); - } - - public boolean isRunning() { - return impl.isRunning(); + return flowControlSettings; } /** @@ -184,7 +190,7 @@ public boolean isRunning() { *

Example of receiving a specific number of messages. *

 {@code
    * Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
-   * subscriber.addListener(new Subscriber.SubscriberListener() {
+   * subscriber.addListener(new Subscriber.Listener() {
    *   public void failed(Subscriber.State from, Throwable failure) {
    *     // Handle error.
    *   }
@@ -197,319 +203,201 @@ public boolean isRunning() {
    * }
* */ - public Subscriber startAsync() { - impl.startAsync(); - return this; - } - - public State state() { - return convertState(impl.state()); + @Override + public ApiService startAsync() { + // Override only for the docs. + return super.startAsync(); } - private State convertState(Service.State state) { - switch (state) { - case FAILED: - return State.FAILED; - case NEW: - return State.NEW; - case RUNNING: - return State.RUNNING; - case STARTING: - return State.STARTING; - case STOPPING: - return State.STOPPING; - case TERMINATED: - return State.TERMINATED; - } - throw new IllegalStateException("unknown state: " + state); + @Override + protected void doStart() { + logger.log(Level.FINE, "Starting subscriber group."); + // Streaming pull is not enabled on the service yet. + // startStreamingConnections(); + startPollingConnections(); + notifyStarted(); } - public Subscriber stopAsync() { - impl.stopAsync(); - return this; - } - - public enum State { - FAILED, - NEW, - RUNNING, - STARTING, - STOPPING, - TERMINATED - } - - public abstract static class SubscriberListener { - public void failed(Subscriber.State from, Throwable failure) {} - - public void running() {} - - public void starting() {} - - public void stopping(State from) {} - - public void terminated(State from) {} - } - - private static class SubscriberImpl extends AbstractService { - private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); - - private final SubscriptionName subscriptionName; - private final String cachedSubscriptionNameString; - private final FlowControlSettings flowControlSettings; - private final Duration ackExpirationPadding; - private final ScheduledExecutorService executor; - private final Distribution ackLatencyDistribution = - new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); - private final int numChannels; - private final FlowController flowController; - private final ManagedChannelBuilder> channelBuilder; - private final Credentials credentials; - private final MessageReceiver receiver; - private final List streamingSubscriberConnections; - private final List pollingSubscriberConnections; - private final ApiClock clock; - private final List closeables = new ArrayList<>(); - private ScheduledFuture ackDeadlineUpdater; - private int streamAckDeadlineSeconds; - - private SubscriberImpl(Builder builder) throws IOException { - receiver = builder.receiver; - flowControlSettings = builder.flowControlSettings; - subscriptionName = builder.subscriptionName; - cachedSubscriptionNameString = subscriptionName.toString(); - ackExpirationPadding = builder.ackExpirationPadding; - streamAckDeadlineSeconds = - Math.max( - INITIAL_ACK_DEADLINE_SECONDS, - Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); - clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); - - flowController = new FlowController(builder.flowControlSettings); - - executor = builder.executorProvider.getExecutor(); - if (builder.executorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() throws IOException { - executor.shutdown(); - } - }); + @Override + protected void doStop() { + stopAllStreamingConnections(); + stopAllPollingConnections(); + try { + for (AutoCloseable closeable : closeables) { + closeable.close(); } - - channelBuilder = - builder.channelBuilder.isPresent() - ? builder.channelBuilder.get() - : NettyChannelBuilder.forAddress( - SubscriptionAdminSettings.getDefaultServiceAddress(), - SubscriptionAdminSettings.getDefaultServicePort()) - .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE) - .flowControlWindow(5000000) // 2.5 MB - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .executor(executor); - - credentials = - builder.credentials.isPresent() - ? builder.credentials.get() - : GoogleCredentials.getApplicationDefault() - .createScoped(SubscriptionAdminSettings.getDefaultServiceScopes()); - - numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; - streamingSubscriberConnections = new ArrayList(numChannels); - pollingSubscriberConnections = new ArrayList(numChannels); - } - - @Override - protected void doStart() { - logger.log(Level.FINE, "Starting subscriber group."); - // Streaming pull is not enabled on the service yet. - // startStreamingConnections(); - startPollingConnections(); - notifyStarted(); + notifyStopped(); + } catch (Exception e) { + notifyFailed(e); } + } - @Override - protected void doStop() { - stopAllStreamingConnections(); - stopAllPollingConnections(); - try { - for (AutoCloseable closeable : closeables) { - closeable.close(); - } - notifyStopped(); - } catch (Exception e) { - notifyFailed(e); + private void startStreamingConnections() { + synchronized (streamingSubscriberConnections) { + for (int i = 0; i < numChannels; i++) { + streamingSubscriberConnections.add( + new StreamingSubscriberConnection( + cachedSubscriptionNameString, + credentials, + receiver, + ackExpirationPadding, + streamAckDeadlineSeconds, + ackLatencyDistribution, + channelBuilder.build(), + flowController, + executor, + clock)); } + startConnections( + streamingSubscriberConnections, + new Listener() { + @Override + public void failed(State from, Throwable failure) { + // If a connection failed is because of a fatal error, we should fail the + // whole subscriber. + stopAllStreamingConnections(); + if (failure instanceof StatusRuntimeException + && ((StatusRuntimeException) failure).getStatus().getCode() + == Status.Code.UNIMPLEMENTED) { + logger.info("Unable to open streaming connections, falling back to polling."); + startPollingConnections(); + return; + } + notifyFailed(failure); + } + }); } - private void startStreamingConnections() { - synchronized (streamingSubscriberConnections) { - for (int i = 0; i < numChannels; i++) { - streamingSubscriberConnections.add( - new StreamingSubscriberConnection( - cachedSubscriptionNameString, - credentials, - receiver, - ackExpirationPadding, - streamAckDeadlineSeconds, - ackLatencyDistribution, - channelBuilder.build(), - flowController, - executor, - clock)); - } - startConnections( - streamingSubscriberConnections, - new Listener() { + ackDeadlineUpdater = + executor.scheduleAtFixedRate( + new Runnable() { @Override - public void failed(State from, Throwable failure) { - // If a connection failed is because of a fatal error, we should fail the - // whole subscriber. - stopAllStreamingConnections(); - if (failure instanceof StatusRuntimeException - && ((StatusRuntimeException) failure).getStatus().getCode() - == Status.Code.UNIMPLEMENTED) { - logger.info("Unable to open streaming connections, falling back to polling."); - startPollingConnections(); - return; - } - notifyFailed(failure); - } - }); - } - - ackDeadlineUpdater = - executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API. - long ackLatency = - ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); - if (ackLatency > 0) { - int possibleStreamAckDeadlineSeconds = - Math.max( - MIN_ACK_DEADLINE_SECONDS, - Ints.saturatedCast( - Math.max(ackLatency, ackExpirationPadding.getStandardSeconds()))); - if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) { - streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds; - logger.log( - Level.FINER, - "Updating stream deadline to {0} seconds.", - streamAckDeadlineSeconds); - for (StreamingSubscriberConnection subscriberConnection : - streamingSubscriberConnections) { - subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds); - } + public void run() { + // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API. + long ackLatency = + ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + if (ackLatency > 0) { + int possibleStreamAckDeadlineSeconds = + Math.max( + MIN_ACK_DEADLINE_SECONDS, + Ints.saturatedCast( + Math.max(ackLatency, ackExpirationPadding.getStandardSeconds()))); + if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) { + streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds; + logger.log( + Level.FINER, + "Updating stream deadline to {0} seconds.", + streamAckDeadlineSeconds); + for (StreamingSubscriberConnection subscriberConnection : + streamingSubscriberConnections) { + subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds); } } } - }, - ACK_DEADLINE_UPDATE_PERIOD.getMillis(), - ACK_DEADLINE_UPDATE_PERIOD.getMillis(), - TimeUnit.MILLISECONDS); - } + } + }, + ACK_DEADLINE_UPDATE_PERIOD.getMillis(), + ACK_DEADLINE_UPDATE_PERIOD.getMillis(), + TimeUnit.MILLISECONDS); + } - private void stopAllStreamingConnections() { - stopConnections(streamingSubscriberConnections); - if (ackDeadlineUpdater != null) { - ackDeadlineUpdater.cancel(true); - } + private void stopAllStreamingConnections() { + stopConnections(streamingSubscriberConnections); + if (ackDeadlineUpdater != null) { + ackDeadlineUpdater.cancel(true); } + } - private void startPollingConnections() { - synchronized (pollingSubscriberConnections) { - for (int i = 0; i < numChannels; i++) { - pollingSubscriberConnections.add( - new PollingSubscriberConnection( - cachedSubscriptionNameString, - credentials, - receiver, - ackExpirationPadding, - ackLatencyDistribution, - channelBuilder.build(), - flowController, - executor, - clock)); - } - startConnections( - pollingSubscriberConnections, - new Listener() { - @Override - public void failed(State from, Throwable failure) { - // If a connection failed is because of a fatal error, we should fail the - // whole subscriber. - stopAllPollingConnections(); - try { - notifyFailed(failure); - } catch (IllegalStateException e) { - if (isRunning()) { - throw e; - } - // It could happen that we are shutting down while some channels fail. + private void startPollingConnections() { + synchronized (pollingSubscriberConnections) { + for (int i = 0; i < numChannels; i++) { + pollingSubscriberConnections.add( + new PollingSubscriberConnection( + cachedSubscriptionNameString, + credentials, + receiver, + ackExpirationPadding, + ackLatencyDistribution, + channelBuilder.build(), + flowController, + executor, + clock)); + } + startConnections( + pollingSubscriberConnections, + new Listener() { + @Override + public void failed(State from, Throwable failure) { + // If a connection failed is because of a fatal error, we should fail the + // whole subscriber. + stopAllPollingConnections(); + try { + notifyFailed(failure); + } catch (IllegalStateException e) { + if (isRunning()) { + throw e; } + // It could happen that we are shutting down while some channels fail. } - }); - } + } + }); } + } - private void stopAllPollingConnections() { - stopConnections(pollingSubscriberConnections); - } + private void stopAllPollingConnections() { + stopConnections(pollingSubscriberConnections); + } - private void startConnections( - List connections, final Listener connectionsListener) { - final CountDownLatch subscribersStarting = new CountDownLatch(numChannels); - for (final Service subscriber : connections) { - executor.submit( - new Runnable() { - @Override - public void run() { - subscriber.addListener(connectionsListener, executor); - try { - subscriber.startAsync().awaitRunning(); - } finally { - subscribersStarting.countDown(); - } + private void startConnections( + List connections, final ApiService.Listener connectionsListener) { + final CountDownLatch subscribersStarting = new CountDownLatch(numChannels); + for (final ApiService subscriber : connections) { + executor.submit( + new Runnable() { + @Override + public void run() { + subscriber.addListener(connectionsListener, executor); + try { + subscriber.startAsync().awaitRunning(); + } finally { + subscribersStarting.countDown(); } - }); - } - try { - subscribersStarting.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + } + }); } + try { + subscribersStarting.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } - private void stopConnections(List connections) { - ArrayList liveConnections; - synchronized (connections) { - liveConnections = new ArrayList(connections); - connections.clear(); - } - final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size()); - for (final Service subscriberConnection : liveConnections) { - executor.submit( - new Runnable() { - @Override - public void run() { - try { - subscriberConnection.stopAsync().awaitTerminated(); - } catch (IllegalStateException ignored) { - // It is expected for some connections to be already in state failed so stop will - // throw this expection. - } - connectionsStopping.countDown(); + private void stopConnections(List connections) { + ArrayList liveConnections; + synchronized (connections) { + liveConnections = new ArrayList(connections); + connections.clear(); + } + final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size()); + for (final ApiService subscriberConnection : liveConnections) { + executor.submit( + new Runnable() { + @Override + public void run() { + try { + subscriberConnection.stopAsync().awaitTerminated(); + } catch (IllegalStateException ignored) { + // It is expected for some connections to be already in state failed so stop will + // throw this expection. } - }); - } - try { - connectionsStopping.await(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } + connectionsStopping.countDown(); + } + }); + } + try { + connectionsStopping.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 2b09914b535d..b1f657bedbd7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -25,9 +25,9 @@ import com.google.cloud.pubsub.spi.v1.AckReplyConsumer; import com.google.cloud.pubsub.spi.v1.MessageReceiver; import com.google.cloud.pubsub.spi.v1.Publisher; -import com.google.cloud.pubsub.spi.v1.TopicAdminClient; import com.google.cloud.pubsub.spi.v1.Subscriber; import com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.spi.v1.TopicAdminClient; import com.google.common.util.concurrent.MoreExecutors; import com.google.iam.v1.Binding; import com.google.iam.v1.Policy; @@ -124,7 +124,7 @@ public void receiveMessage( }) .build(); subscriber.addListener( - new Subscriber.SubscriberListener() { + new Subscriber.Listener() { public void failed(Subscriber.State from, Throwable failure) { received.setException(failure); } diff --git a/pom.xml b/pom.xml index 38367b16d974..8b793335d3a8 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,7 @@ github 0.6.0 1.0.3 - 0.7.0 + 0.7.1 0.1.5 0.10.1-alpha-SNAPSHOT 0.10.1-beta-SNAPSHOT