From 5bd9fb698a10d629c514c718c55c871a5c6c2344 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 16 Jan 2017 15:36:13 +1100 Subject: [PATCH 1/2] de-interfac-ify Subscriber --- .../cloud/pubsub/spi/v1/Subscriber.java | 315 +++++++++++++++-- .../cloud/pubsub/spi/v1/SubscriberImpl.java | 326 ------------------ .../pubsub/spi/v1/SubscriberImplTest.java | 4 +- 3 files changed, 291 insertions(+), 354 deletions(-) delete mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberImpl.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 0fbb5bd6836f..dbda4444c9bf 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -17,17 +17,35 @@ package com.google.cloud.pubsub.spi.v1; import com.google.api.gax.bundling.FlowController; +import com.google.api.stats.Distribution; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.spi.v1.MessageReceiver.AckReply; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Cloud Pub/Sub subscriber that is @@ -85,33 +103,278 @@ * subscriber.stopAsync(); * */ -public interface Subscriber extends Service { - /** Retrieves a snapshot of the current subscriber statistics. */ - SubscriberStats getStats(); - - /** Subscription for which the subscriber is streaming messages. */ - String getSubscription(); - - /** - * Time before a message is to expire when the subscriber is going to attempt to renew its ack - * deadline. - */ - Duration getAckExpirationPadding(); - - /** - * Maximum number of outstanding messages before limits are enforced. - * - *

When limits are enforced, no more messages will be dispatched to the {@link - * MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window - * management, still some extra bytes could be kept at lower layers. - */ - Optional getMaxOutstandingElementCount(); - - /** Maximum number of outstanding bytes before limits are enforced. */ - Optional getMaxOutstandingRequestBytes(); +public class Subscriber extends AbstractService { + private static final int THREADS_PER_CHANNEL = 5; + @VisibleForTesting static final int CHANNELS_PER_CORE = 10; + private static final int MAX_INBOUND_MESSAGE_SIZE = + 20 * 1024 * 1024; // 20MB API maximum message size. + private static final int INITIAL_ACK_DEADLINE_SECONDS = 10; + private static final int MAX_ACK_DEADLINE_SECONDS = 600; + private static final int MIN_ACK_DEADLINE_SECONDS = 10; + private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1); + private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; + + private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); + + private final String subscription; + private final FlowController.Settings flowControlSettings; + private final Duration ackExpirationPadding; + private final ScheduledExecutorService executor; + private final Distribution ackLatencyDistribution = + new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); + private final int numChannels; + private final FlowController flowController; + private final ManagedChannelBuilder> channelBuilder; + private final Credentials credentials; + private final MessageReceiver receiver; + private final List streamingSubscriberConnections; + private final List pollingSubscriberConnections; + private final Clock clock; + private ScheduledFuture ackDeadlineUpdater; + private int streamAckDeadlineSeconds; + + public Subscriber(Builder builder) throws IOException { + receiver = builder.receiver; + flowControlSettings = builder.flowControlSettings; + subscription = builder.subscription; + ackExpirationPadding = builder.ackExpirationPadding; + streamAckDeadlineSeconds = + Math.max( + INITIAL_ACK_DEADLINE_SECONDS, + Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); + clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); + + flowController = new FlowController(builder.flowControlSettings, false); + + numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; + executor = + builder.executor.isPresent() + ? builder.executor.get() + : Executors.newScheduledThreadPool( + numChannels * THREADS_PER_CHANNEL, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("cloud-pubsub-subscriber-thread-%d") + .build()); + + channelBuilder = + builder.channelBuilder.isPresent() + ? builder.channelBuilder.get() + : NettyChannelBuilder.forAddress( + SubscriberSettings.getDefaultServiceAddress(), + SubscriberSettings.getDefaultServicePort()) + .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .flowControlWindow(5000000) // 2.5 MB + .negotiationType(NegotiationType.TLS) + .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) + .executor(executor); + + credentials = + builder.credentials.isPresent() + ? builder.credentials.get() + : GoogleCredentials.getApplicationDefault() + .createScoped(SubscriberSettings.getDefaultServiceScopes()); + + streamingSubscriberConnections = new ArrayList(numChannels); + pollingSubscriberConnections = new ArrayList(numChannels); + } + + @Override + protected void doStart() { + logger.debug("Starting subscriber group."); + startStreamingConnections(); + notifyStarted(); + } + + @Override + protected void doStop() { + stopAllStreamingConnections(); + stopAllPollingConnections(); + notifyStopped(); + } + + private void startStreamingConnections() { + synchronized (streamingSubscriberConnections) { + for (int i = 0; i < numChannels; i++) { + streamingSubscriberConnections.add( + new StreamingSubscriberConnection( + subscription, + credentials, + receiver, + ackExpirationPadding, + streamAckDeadlineSeconds, + ackLatencyDistribution, + channelBuilder.build(), + flowController, + executor, + clock)); + } + startConnections( + streamingSubscriberConnections, + new Listener() { + @Override + public void failed(State from, Throwable failure) { + // If a connection failed is because of a fatal error, we should fail the + // whole subscriber. + stopAllStreamingConnections(); + if (failure instanceof StatusRuntimeException + && ((StatusRuntimeException) failure).getStatus().getCode() + == Status.Code.UNIMPLEMENTED) { + logger.info("Unable to open streaming connections, falling back to polling."); + startPollingConnections(); + return; + } + notifyFailed(failure); + } + }); + } + + ackDeadlineUpdater = + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API. + long ackLatency = + ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + if (ackLatency > 0) { + int possibleStreamAckDeadlineSeconds = + Math.max( + MIN_ACK_DEADLINE_SECONDS, + Ints.saturatedCast( + Math.max(ackLatency, ackExpirationPadding.getStandardSeconds()))); + if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) { + streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds; + logger.debug( + "Updating stream deadline to {} seconds.", streamAckDeadlineSeconds); + for (StreamingSubscriberConnection subscriberConnection : + streamingSubscriberConnections) { + subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds); + } + } + } + } + }, + ACK_DEADLINE_UPDATE_PERIOD.getMillis(), + ACK_DEADLINE_UPDATE_PERIOD.getMillis(), + TimeUnit.MILLISECONDS); + } + + private void stopAllStreamingConnections() { + stopConnections(streamingSubscriberConnections); + ackDeadlineUpdater.cancel(true); + } + + private void startPollingConnections() { + synchronized (pollingSubscriberConnections) { + for (int i = 0; i < numChannels; i++) { + pollingSubscriberConnections.add( + new PollingSubscriberConnection( + subscription, + credentials, + receiver, + ackExpirationPadding, + ackLatencyDistribution, + channelBuilder.build(), + flowController, + executor, + clock)); + } + startConnections( + pollingSubscriberConnections, + new Listener() { + @Override + public void failed(State from, Throwable failure) { + // If a connection failed is because of a fatal error, we should fail the + // whole subscriber. + stopAllPollingConnections(); + try { + notifyFailed(failure); + } catch (IllegalStateException e) { + if (isRunning()) { + throw e; + } + // It could happen that we are shutting down while some channels fail. + } + } + }); + } + } + + private void stopAllPollingConnections() { + stopConnections(pollingSubscriberConnections); + } + + private void startConnections( + List connections, final Listener connectionsListener) { + final CountDownLatch subscribersStarting = new CountDownLatch(numChannels); + for (final Service subscriber : connections) { + executor.submit( + new Runnable() { + @Override + public void run() { + subscriber.startAsync().awaitRunning(); + subscribersStarting.countDown(); + subscriber.addListener(connectionsListener, executor); + } + }); + } + try { + subscribersStarting.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void stopConnections(List connections) { + ArrayList liveConnections; + synchronized (connections) { + liveConnections = new ArrayList(connections); + connections.clear(); + } + final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size()); + for (final Service subscriberConnection : liveConnections) { + executor.submit( + new Runnable() { + @Override + public void run() { + try { + subscriberConnection.stopAsync().awaitTerminated(); + } catch (IllegalStateException ignored) { + // It is expected for some connections to be already in state failed so stop will + // throw this expection. + } + connectionsStopping.countDown(); + } + }); + } + try { + connectionsStopping.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + public SubscriberStats getStats() { + // TODO: Implement me + return null; + } + + public String getSubscription() { + return subscription; + } + + public Duration getAckExpirationPadding() { + return ackExpirationPadding; + } + + public FlowController.Settings getFlowControlSettings() { + return flowControlSettings; + } + /** Builder of {@link Subscriber Subscribers}. */ - public final class Builder { + public static final class Builder { private static final Duration MIN_ACK_EXPIRATION_PADDING = Duration.millis(100); private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); @@ -207,7 +470,7 @@ Builder setClock(Clock clock) { } public Subscriber build() throws IOException { - return new SubscriberImpl(this); + return new Subscriber(this); } } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberImpl.java deleted file mode 100644 index b5b6ff5c9c39..000000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberImpl.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.spi.v1; - -import com.google.api.gax.bundling.FlowController; -import com.google.api.stats.Distribution; -import com.google.auth.Credentials; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.Clock; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Implementation of {@link Subscriber}. */ -class SubscriberImpl extends AbstractService implements Subscriber { - private static final int THREADS_PER_CHANNEL = 5; - @VisibleForTesting static final int CHANNELS_PER_CORE = 10; - private static final int MAX_INBOUND_MESSAGE_SIZE = - 20 * 1024 * 1024; // 20MB API maximum message size. - private static final int INITIAL_ACK_DEADLINE_SECONDS = 10; - private static final int MAX_ACK_DEADLINE_SECONDS = 600; - private static final int MIN_ACK_DEADLINE_SECONDS = 10; - private static final Duration ACK_DEADLINE_UPDATE_PERIOD = Duration.standardMinutes(1); - private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; - - private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); - - private final String subscription; - private final FlowController.Settings flowControlSettings; - private final Duration ackExpirationPadding; - private final ScheduledExecutorService executor; - private final Distribution ackLatencyDistribution = - new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); - private final int numChannels; - private final FlowController flowController; - private final ManagedChannelBuilder> channelBuilder; - private final Credentials credentials; - private final MessageReceiver receiver; - private final List streamingSubscriberConnections; - private final List pollingSubscriberConnections; - private final Clock clock; - private ScheduledFuture ackDeadlineUpdater; - private int streamAckDeadlineSeconds; - - public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException { - receiver = builder.receiver; - flowControlSettings = builder.flowControlSettings; - subscription = builder.subscription; - ackExpirationPadding = builder.ackExpirationPadding; - streamAckDeadlineSeconds = - Math.max( - INITIAL_ACK_DEADLINE_SECONDS, - Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); - clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); - - flowController = new FlowController(builder.flowControlSettings, false); - - numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; - executor = - builder.executor.isPresent() - ? builder.executor.get() - : Executors.newScheduledThreadPool( - numChannels * THREADS_PER_CHANNEL, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("cloud-pubsub-subscriber-thread-%d") - .build()); - - channelBuilder = - builder.channelBuilder.isPresent() - ? builder.channelBuilder.get() - : NettyChannelBuilder.forAddress( - SubscriberSettings.getDefaultServiceAddress(), - SubscriberSettings.getDefaultServicePort()) - .maxMessageSize(MAX_INBOUND_MESSAGE_SIZE) - .flowControlWindow(5000000) // 2.5 MB - .negotiationType(NegotiationType.TLS) - .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) - .executor(executor); - - credentials = - builder.credentials.isPresent() - ? builder.credentials.get() - : GoogleCredentials.getApplicationDefault() - .createScoped(SubscriberSettings.getDefaultServiceScopes()); - - streamingSubscriberConnections = new ArrayList(numChannels); - pollingSubscriberConnections = new ArrayList(numChannels); - } - - @Override - protected void doStart() { - logger.debug("Starting subscriber group."); - startStreamingConnections(); - notifyStarted(); - } - - @Override - protected void doStop() { - stopAllStreamingConnections(); - stopAllPollingConnections(); - notifyStopped(); - } - - private void startStreamingConnections() { - synchronized (streamingSubscriberConnections) { - for (int i = 0; i < numChannels; i++) { - streamingSubscriberConnections.add( - new StreamingSubscriberConnection( - subscription, - credentials, - receiver, - ackExpirationPadding, - streamAckDeadlineSeconds, - ackLatencyDistribution, - channelBuilder.build(), - flowController, - executor, - clock)); - } - startConnections( - streamingSubscriberConnections, - new Listener() { - @Override - public void failed(State from, Throwable failure) { - // If a connection failed is because of a fatal error, we should fail the - // whole subscriber. - stopAllStreamingConnections(); - if (failure instanceof StatusRuntimeException - && ((StatusRuntimeException) failure).getStatus().getCode() - == Status.Code.UNIMPLEMENTED) { - logger.info("Unable to open streaming connections, falling back to polling."); - startPollingConnections(); - return; - } - notifyFailed(failure); - } - }); - } - - ackDeadlineUpdater = - executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API. - long ackLatency = - ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); - if (ackLatency > 0) { - int possibleStreamAckDeadlineSeconds = - Math.max( - MIN_ACK_DEADLINE_SECONDS, - Ints.saturatedCast( - Math.max(ackLatency, ackExpirationPadding.getStandardSeconds()))); - if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) { - streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds; - logger.debug( - "Updating stream deadline to {} seconds.", streamAckDeadlineSeconds); - for (StreamingSubscriberConnection subscriberConnection : - streamingSubscriberConnections) { - subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds); - } - } - } - } - }, - ACK_DEADLINE_UPDATE_PERIOD.getMillis(), - ACK_DEADLINE_UPDATE_PERIOD.getMillis(), - TimeUnit.MILLISECONDS); - } - - private void stopAllStreamingConnections() { - stopConnections(streamingSubscriberConnections); - ackDeadlineUpdater.cancel(true); - } - - private void startPollingConnections() { - synchronized (pollingSubscriberConnections) { - for (int i = 0; i < numChannels; i++) { - pollingSubscriberConnections.add( - new PollingSubscriberConnection( - subscription, - credentials, - receiver, - ackExpirationPadding, - ackLatencyDistribution, - channelBuilder.build(), - flowController, - executor, - clock)); - } - startConnections( - pollingSubscriberConnections, - new Listener() { - @Override - public void failed(State from, Throwable failure) { - // If a connection failed is because of a fatal error, we should fail the - // whole subscriber. - stopAllPollingConnections(); - try { - notifyFailed(failure); - } catch (IllegalStateException e) { - if (isRunning()) { - throw e; - } - // It could happen that we are shutting down while some channels fail. - } - } - }); - } - } - - private void stopAllPollingConnections() { - stopConnections(pollingSubscriberConnections); - } - - private void startConnections( - List connections, final Listener connectionsListener) { - final CountDownLatch subscribersStarting = new CountDownLatch(numChannels); - for (final Service subscriber : connections) { - executor.submit( - new Runnable() { - @Override - public void run() { - subscriber.startAsync().awaitRunning(); - subscribersStarting.countDown(); - subscriber.addListener(connectionsListener, executor); - } - }); - } - try { - subscribersStarting.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private void stopConnections(List connections) { - ArrayList liveConnections; - synchronized (connections) { - liveConnections = new ArrayList(connections); - connections.clear(); - } - final CountDownLatch connectionsStopping = new CountDownLatch(liveConnections.size()); - for (final Service subscriberConnection : liveConnections) { - executor.submit( - new Runnable() { - @Override - public void run() { - try { - subscriberConnection.stopAsync().awaitTerminated(); - } catch (IllegalStateException ignored) { - // It is expected for some connections to be already in state failed so stop will - // throw this expection. - } - connectionsStopping.countDown(); - } - }); - } - try { - connectionsStopping.await(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - - @Override - public SubscriberStats getStats() { - // TODO: Implement me - return null; - } - - @Override - public String getSubscription() { - return subscription; - } - - @Override - public Duration getAckExpirationPadding() { - return ackExpirationPadding; - } - - @Override - public Optional getMaxOutstandingElementCount() { - return flowControlSettings.getMaxOutstandingElementCount(); - } - - @Override - public Optional getMaxOutstandingRequestBytes() { - return flowControlSettings.getMaxOutstandingRequestBytes(); - } -} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java index 8aca5e8d96bd..da574c2973ad 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java @@ -370,7 +370,7 @@ public void testOpenedChannels() throws Exception { } final int expectedChannelCount = - Runtime.getRuntime().availableProcessors() * SubscriberImpl.CHANNELS_PER_CORE; + Runtime.getRuntime().availableProcessors() * Subscriber.CHANNELS_PER_CORE; Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver)); @@ -388,7 +388,7 @@ public void testFailedChannel_recoverableError_channelReopened() throws Exceptio } final int expectedChannelCount = - Runtime.getRuntime().availableProcessors() * SubscriberImpl.CHANNELS_PER_CORE; + Runtime.getRuntime().availableProcessors() * Subscriber.CHANNELS_PER_CORE; Subscriber subscriber = startSubscriber( From c4e11e37909c3dc302d040458c9f7057925213c3 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 17 Jan 2017 09:51:15 +1100 Subject: [PATCH 2/2] reduce visiblity of Publisher constructor --- .../main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index dbda4444c9bf..6bce5f306802 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -133,7 +133,7 @@ public class Subscriber extends AbstractService { private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; - public Subscriber(Builder builder) throws IOException { + private Subscriber(Builder builder) throws IOException { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscription = builder.subscription;