diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index ad622833c4b6..0c56ed7dd165 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -376,8 +376,7 @@ public void run() { /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ @InternalApi int computeDeadlineSeconds() { - long secLong = ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); - int sec = Ints.saturatedCast(secLong); + int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); // Use Ints.constrainToRange when we get guava 21. if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index ea2475cb3fdf..d5d1b4acf119 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; @@ -357,7 +359,8 @@ public void onFailure(Throwable t) { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } } - }); + }, + directExecutor()); } private static final class OutstandingBatch { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 0f273c3429a8..ef3ce23d679f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.api.core.AbstractApiService; import com.google.api.core.ApiClock; import com.google.api.core.ApiFuture; @@ -291,7 +293,7 @@ public void onFailure(Throwable t) { .addAllAckIds(idChunk) .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) .build()); - ApiFutures.addCallback(future, loggingCallback); + ApiFutures.addCallback(future, loggingCallback, directExecutor()); } } @@ -303,7 +305,7 @@ public void onFailure(Throwable t) { .setSubscription(subscription) .addAllAckIds(idChunk) .build()); - ApiFutures.addCallback(future, loggingCallback); + ApiFutures.addCallback(future, loggingCallback, directExecutor()); } } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index e1094ec93cd2..f3b5dfe6e704 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -42,8 +42,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import junit.framework.Assert; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test;