From e58014aa78c577767914b2208cabf408791561d5 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 9 Mar 2018 14:40:13 +1100 Subject: [PATCH 1/2] pubsub: make Publisher/Subscriber accept plain strings --- .../com/google/cloud/pubsub/v1/Publisher.java | 41 +++++++++++++++---- .../google/cloud/pubsub/v1/Subscriber.java | 31 +++++++++----- .../cloud/pubsub/v1/PublisherImplTest.java | 2 +- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 4db17723937e..b00ae8a4c829 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -41,6 +41,8 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.TopicNames; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; @@ -90,8 +92,7 @@ public class Publisher { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); - private final ProjectTopicName topicName; - private final String cachedTopicNameString; + private final String topicName; private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; @@ -124,7 +125,6 @@ public static long getApiMaxRequestBytes() { private Publisher(Builder builder) throws IOException { topicName = builder.topicName; - cachedTopicNameString = topicName.toString(); this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; @@ -167,7 +167,12 @@ private Publisher(Builder builder) throws IOException { } /** Topic which the publisher publishes to. */ - public ProjectTopicName getTopicName() { + public TopicName getTopicName() { + return TopicNames.parse(topicName); + } + +/** Topic which the publisher publishes to. */ + public String getTopicNameString() { return topicName; } @@ -312,7 +317,7 @@ private void publishAllOutstanding() { private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); - publishRequest.setTopic(cachedTopicNameString); + publishRequest.setTopic(topicName); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } @@ -511,7 +516,27 @@ interface LongRandom { * } * */ - public static Builder newBuilder(ProjectTopicName topicName) { + public static Builder newBuilder(TopicName topicName) { + return newBuilder(topicName.toString()); + } + + /** + * Constructs a new {@link Builder} using the given topic. + * + *

Example of creating a {@code Publisher}. + *

{@code
+   * String topic = "projects/my_project/topics/my_topic";
+   * Publisher publisher = Publisher.newBuilder(topic).build();
+   * try {
+   *   // ...
+   * } finally {
+   *   // When finished with the publisher, make sure to shutdown to free up resources.
+   *   publisher.shutdown();
+   * }
+   * }
+ * + */ + public static Builder newBuilder(String topicName) { return new Builder(topicName); } @@ -556,7 +581,7 @@ public long nextLong(long least, long bound) { .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) .build(); - ProjectTopicName topicName; + String topicName; // Batching options BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; @@ -574,7 +599,7 @@ public long nextLong(long least, long bound) { CredentialsProvider credentialsProvider = TopicAdminSettings.defaultCredentialsProviderBuilder().build(); - private Builder(ProjectTopicName topic) { + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index bd1d87c910c2..14469779b411 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService { private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); - private final ProjectSubscriptionName subscriptionName; - private final String cachedSubscriptionNameString; + private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; @@ -135,7 +134,6 @@ private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; - cachedSubscriptionNameString = subscriptionName.toString(); Preconditions.checkArgument( builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive"); @@ -204,19 +202,32 @@ public void close() throws IOException { /** * Constructs a new {@link Builder}. * - *

Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link - * Subscriber}. - * * @param subscription Cloud Pub/Sub subscription to bind the subscriber to * @param receiver an implementation of {@link MessageReceiver} used to process the received * messages */ public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) { + return newBuilder(subscription.toString(), receiver); + } + + /** + * Constructs a new {@link Builder}. + * + * @param subscription Cloud Pub/Sub subscription to bind the subscriber to + * @param receiver an implementation of {@link MessageReceiver} used to process the received + * messages + */ + public static Builder newBuilder(String subscription, MessageReceiver receiver) { return new Builder(subscription, receiver); } /** Subscription which the subscriber is subscribed to. */ public ProjectSubscriptionName getSubscriptionName() { + return ProjectSubscriptionName.parse(subscriptionName); + } + + /** Subscription which the subscriber is subscribed to. */ + public String getSubscriptionNameString() { return subscriptionName; } @@ -344,7 +355,7 @@ private void startPollingConnections() throws IOException { Subscription subscriptionInfo = getSubStub.getSubscription( GetSubscriptionRequest.newBuilder() - .setSubscription(cachedSubscriptionNameString) + .setSubscription(subscriptionName) .build()); for (Channel channel : channels) { @@ -401,7 +412,7 @@ private void startStreamingConnections() throws IOException { } streamingSubscriberConnections.add( new StreamingSubscriberConnection( - cachedSubscriptionNameString, + subscriptionName, receiver, ackExpirationPadding, maxAckExtensionPeriod, @@ -491,7 +502,7 @@ public static final class Builder { * Runtime.getRuntime().availableProcessors()) .build(); - ProjectSubscriptionName subscriptionName; + String subscriptionName; MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; @@ -519,7 +530,7 @@ public static final class Builder { boolean useStreaming = true; int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE; - Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) { + Builder(String subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1495c5f32071..7e1a1d46bd7c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -472,7 +472,7 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC, builder.topicName); + assertEquals(TEST_TOPIC.toString(), builder.topicName); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertEquals( Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, From f5d383776c0a33c13d15064aac6e4d445a4cc04e Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 12 Mar 2018 11:29:43 +1100 Subject: [PATCH 2/2] pr comment --- .../com/google/cloud/pubsub/v1/Publisher.java | 16 +++++++------- .../google/cloud/pubsub/v1/Subscriber.java | 4 +--- .../cloud/pubsub/v1/PublisherImplTest.java | 21 +++++++++---------- 3 files changed, 18 insertions(+), 23 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index b00ae8a4c829..e09cacbb40cc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -40,21 +40,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.pubsub.v1.ProjectTopicName; -import com.google.pubsub.v1.TopicName; -import com.google.pubsub.v1.TopicNames; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.TopicNames; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; -import org.threeten.bp.Duration; - -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -70,6 +66,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * A Cloud Pub/Sub publisher, that is @@ -171,7 +169,7 @@ public TopicName getTopicName() { return TopicNames.parse(topicName); } -/** Topic which the publisher publishes to. */ + /** Topic which the publisher publishes to. */ public String getTopicNameString() { return topicName; } @@ -502,6 +500,7 @@ interface LongRandom { * Constructs a new {@link Builder} using the given topic. * *

Example of creating a {@code Publisher}. + * *

{@code
    * String projectName = "my_project";
    * String topicName = "my_topic";
@@ -514,7 +513,6 @@ interface LongRandom {
    *   publisher.shutdown();
    * }
    * }
- * */ public static Builder newBuilder(TopicName topicName) { return newBuilder(topicName.toString()); @@ -524,6 +522,7 @@ public static Builder newBuilder(TopicName topicName) { * Constructs a new {@link Builder} using the given topic. * *

Example of creating a {@code Publisher}. + * *

{@code
    * String topic = "projects/my_project/topics/my_topic";
    * Publisher publisher = Publisher.newBuilder(topic).build();
@@ -534,7 +533,6 @@ public static Builder newBuilder(TopicName topicName) {
    *   publisher.shutdown();
    * }
    * }
- * */ public static Builder newBuilder(String topicName) { return new Builder(topicName); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 14469779b411..610885273058 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -354,9 +354,7 @@ private void startPollingConnections() throws IOException { } Subscription subscriptionInfo = getSubStub.getSubscription( - GetSubscriptionRequest.newBuilder() - .setSubscription(subscriptionName) - .build()); + GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build()); for (Channel channel : channels) { SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 7e1a1d46bd7c..79e0015275f7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,12 @@ package com.google.cloud.pubsub.v1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; @@ -35,6 +41,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,17 +52,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @RunWith(JUnit4.class) public class PublisherImplTest {