diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 74fdb6211abe..a5295ef74adc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -39,12 +39,9 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.SubscriberGrpc; -import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub; import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; -import com.google.pubsub.v1.Subscription; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.auth.MoreCallCredentials; @@ -56,7 +53,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -301,7 +297,7 @@ protected void doStart() { @Override public void run() { try { - startStreamingConnections(); + startStreamingConnections(); notifyStarted(); } catch (Throwable t) { notifyFailed(t); @@ -313,16 +309,23 @@ public void run() { @Override protected void doStop() { - // stop connection is no-op if connections haven't been started. - stopAllStreamingConnections(); - try { - for (AutoCloseable closeable : closeables) { - closeable.close(); - } - notifyStopped(); - } catch (Exception e) { - notifyFailed(e); - } + new Thread( + new Runnable() { + @Override + public void run() { + try { + // stop connection is no-op if connections haven't been started. + stopAllStreamingConnections(); + for (AutoCloseable closeable : closeables) { + closeable.close(); + } + notifyStopped(); + } catch (Exception e) { + notifyFailed(e); + } + } + }) + .start(); } private void startStreamingConnections() throws IOException {