Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,10 @@ public class Subscriber extends AbstractApiService {
private final List<Channel> channels;
private final MessageReceiver receiver;
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
new LinkedList<>();
private final ApiClock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final boolean useStreaming;
private ScheduledFuture<?> ackDeadlineUpdater;

private Subscriber(Builder builder) {
Expand Down Expand Up @@ -195,8 +193,6 @@ public void close() throws IOException {
numChannels = builder.parallelPullCount;
channels = new ArrayList<>(numChannels);
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
useStreaming = builder.useStreaming;
}

/**
Expand Down Expand Up @@ -310,11 +306,7 @@ protected void doStart() {
@Override
public void run() {
try {
if (useStreaming) {
startStreamingConnections();
} else {
startPollingConnections();
}
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
Expand All @@ -327,7 +319,6 @@ public void run() {
@Override
protected void doStop() {
// stop connection is no-op if connections haven't been started.
stopAllPollingConnections();
stopAllStreamingConnections();
try {
for (AutoCloseable closeable : closeables) {
Expand All @@ -339,64 +330,6 @@ protected void doStop() {
}
}

private void startPollingConnections() throws IOException {
synchronized (pollingSubscriberConnections) {
Credentials credentials = credentialsProvider.getCredentials();
CallCredentials callCredentials =
credentials == null ? null : MoreCallCredentials.from(credentials);

SubscriberGrpc.SubscriberBlockingStub getSubStub =
SubscriberGrpc.newBlockingStub(channels.get(0))
.withDeadlineAfter(
PollingSubscriberConnection.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (callCredentials != null) {
getSubStub = getSubStub.withCallCredentials(callCredentials);
}
Subscription subscriptionInfo =
getSubStub.getSubscription(
GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build());

for (Channel channel : channels) {
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
}
pollingSubscriberConnections.add(
new PollingSubscriberConnection(
subscriptionInfo,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
stub,
flowController,
flowControlSettings.getMaxOutstandingElementCount(),
outstandingMessageBatches,
executor,
alarmsExecutor,
clock));
}
startConnections(
pollingSubscriberConnections,
new Listener() {
@Override
public void failed(State from, Throwable failure) {
// If a connection failed is because of a fatal error, we should fail the
// whole subscriber.
stopAllPollingConnections();
try {
notifyFailed(failure);
} catch (IllegalStateException e) {
if (isRunning()) {
throw e;
}
// It could happen that we are shutting down while some channels fail.
}
}
});
}
}

private void startStreamingConnections() throws IOException {
synchronized (streamingSubscriberConnections) {
Credentials credentials = credentialsProvider.getCredentials();
Expand Down Expand Up @@ -443,10 +376,6 @@ public void failed(State from, Throwable failure) {
}
}

private void stopAllPollingConnections() {
stopConnections(pollingSubscriberConnections);
}

private void stopAllStreamingConnections() {
stopConnections(streamingSubscriberConnections);
if (ackDeadlineUpdater != null) {
Expand Down Expand Up @@ -525,7 +454,6 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
SubscriptionAdminSettings.defaultCredentialsProviderBuilder().build();
Optional<ApiClock> clock = Optional.absent();
boolean useStreaming = true;
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;

Builder(String subscriptionName, MessageReceiver receiver) {
Expand Down Expand Up @@ -630,7 +558,7 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
}

/**
* Gives the ability to set a custom executor for polling and managing lease extensions. If none
* Gives the ability to set a custom executor for managing lease extensions. If none
* is provided a shared one will be used by all {@link Subscriber} instances.
*/
public Builder setSystemExecutorProvider(ExecutorProvider executorProvider) {
Expand All @@ -653,11 +581,6 @@ Builder setClock(ApiClock clock) {
return this;
}

Builder setUseStreaming(boolean useStreaming) {
this.useStreaming = useStreaming;
return this;
}

public Subscriber build() {
return new Subscriber(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
}

private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
Subscriber subscriber = testSubscriberBuilder.setUseStreaming(true).build();
Subscriber subscriber = testSubscriberBuilder.build();
subscriber.startAsync().awaitRunning();
return subscriber;
}
Expand Down