From 8c67b98e65ce58972b1ef4a89cacad5917cea945 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Mon, 15 Apr 2019 10:36:56 -0400 Subject: [PATCH 1/2] Cleaning up deprecated pubsub code. --- .../java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 3 +-- .../main/java/com/google/cloud/pubsub/v1/Publisher.java | 7 ++++--- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 6 ++++-- .../test/java/com/google/cloud/pubsub/it/ITPubSubTest.java | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 3400c259e950..956e87db0eef 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -380,8 +380,7 @@ public void run() { /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ @InternalApi int computeDeadlineSeconds() { - long secLong = ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); - int sec = Ints.saturatedCast(secLong); + int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); // Use Ints.constrainToRange when we get guava 21. if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index ea2475cb3fdf..9c0130569cc7 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -59,6 +59,8 @@ import java.util.logging.Logger; import org.threeten.bp.Duration; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + /** * A Cloud Pub/Sub publisher, that is * associated with a specific topic at creation. @@ -317,8 +319,7 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { publishRequest.addMessages(outstandingPublish.message); } - ApiFutures.addCallback( - publisherStub.publishCallable().futureCall(publishRequest.build()), + ApiFutures.addCallback(publisherStub.publishCallable().futureCall(publishRequest.build()), new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -357,7 +358,7 @@ public void onFailure(Throwable t) { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } } - }); + }, directExecutor()); } private static final class OutstandingBatch { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 0f273c3429a8..7fc8811e1194 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -53,6 +53,8 @@ import javax.annotation.Nullable; import org.threeten.bp.Duration; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + /** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */ final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor { private static final Logger logger = @@ -291,7 +293,7 @@ public void onFailure(Throwable t) { .addAllAckIds(idChunk) .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) .build()); - ApiFutures.addCallback(future, loggingCallback); + ApiFutures.addCallback(future, loggingCallback, directExecutor()); } } @@ -303,7 +305,7 @@ public void onFailure(Throwable t) { .setSubscription(subscription) .addAllAckIds(idChunk) .build()); - ApiFutures.addCallback(future, loggingCallback); + ApiFutures.addCallback(future, loggingCallback, directExecutor()); } } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index e1094ec93cd2..f3b5dfe6e704 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -42,8 +42,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import junit.framework.Assert; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; From d367c8fbaca5cf09a7c73ca9983c06887d2b1eec Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Mon, 15 Apr 2019 11:05:57 -0400 Subject: [PATCH 2/2] Fixing format --- .../java/com/google/cloud/pubsub/v1/Publisher.java | 10 ++++++---- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 9c0130569cc7..d5d1b4acf119 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; @@ -59,8 +61,6 @@ import java.util.logging.Logger; import org.threeten.bp.Duration; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; - /** * A Cloud Pub/Sub publisher, that is * associated with a specific topic at creation. @@ -319,7 +319,8 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { publishRequest.addMessages(outstandingPublish.message); } - ApiFutures.addCallback(publisherStub.publishCallable().futureCall(publishRequest.build()), + ApiFutures.addCallback( + publisherStub.publishCallable().futureCall(publishRequest.build()), new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -358,7 +359,8 @@ public void onFailure(Throwable t) { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } } - }, directExecutor()); + }, + directExecutor()); } private static final class OutstandingBatch { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 7fc8811e1194..ef3ce23d679f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.api.core.AbstractApiService; import com.google.api.core.ApiClock; import com.google.api.core.ApiFuture; @@ -53,8 +55,6 @@ import javax.annotation.Nullable; import org.threeten.bp.Duration; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; - /** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */ final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor { private static final Logger logger =