diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 4db17723937e..e09cacbb40cc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -40,19 +40,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.TopicNames; import io.grpc.CallCredentials; import io.grpc.Channel; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; -import org.threeten.bp.Duration; - -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -68,6 +66,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * A Cloud Pub/Sub publisher, that is @@ -90,8 +90,7 @@ public class Publisher { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); - private final ProjectTopicName topicName; - private final String cachedTopicNameString; + private final String topicName; private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; @@ -124,7 +123,6 @@ public static long getApiMaxRequestBytes() { private Publisher(Builder builder) throws IOException { topicName = builder.topicName; - cachedTopicNameString = topicName.toString(); this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; @@ -167,7 +165,12 @@ private Publisher(Builder builder) throws IOException { } /** Topic which the publisher publishes to. */ - public ProjectTopicName getTopicName() { + public TopicName getTopicName() { + return TopicNames.parse(topicName); + } + + /** Topic which the publisher publishes to. */ + public String getTopicNameString() { return topicName; } @@ -312,7 +315,7 @@ private void publishAllOutstanding() { private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { PublishRequest.Builder publishRequest = PublishRequest.newBuilder(); - publishRequest.setTopic(cachedTopicNameString); + publishRequest.setTopic(topicName); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } @@ -497,6 +500,7 @@ interface LongRandom { * Constructs a new {@link Builder} using the given topic. * *
Example of creating a {@code Publisher}. + * *
{@code
* String projectName = "my_project";
* String topicName = "my_topic";
@@ -509,9 +513,28 @@ interface LongRandom {
* publisher.shutdown();
* }
* }
+ */
+ public static Builder newBuilder(TopicName topicName) {
+ return newBuilder(topicName.toString());
+ }
+
+ /**
+ * Constructs a new {@link Builder} using the given topic.
+ *
+ * Example of creating a {@code Publisher}. * + *
{@code
+ * String topic = "projects/my_project/topics/my_topic";
+ * Publisher publisher = Publisher.newBuilder(topic).build();
+ * try {
+ * // ...
+ * } finally {
+ * // When finished with the publisher, make sure to shutdown to free up resources.
+ * publisher.shutdown();
+ * }
+ * }
*/
- public static Builder newBuilder(ProjectTopicName topicName) {
+ public static Builder newBuilder(String topicName) {
return new Builder(topicName);
}
@@ -556,7 +579,7 @@ public long nextLong(long least, long bound) {
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();
- ProjectTopicName topicName;
+ String topicName;
// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
@@ -574,7 +597,7 @@ public long nextLong(long least, long bound) {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();
- private Builder(ProjectTopicName topic) {
+ private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
index bd1d87c910c2..610885273058 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
@@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService {
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
- private final ProjectSubscriptionName subscriptionName;
- private final String cachedSubscriptionNameString;
+ private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
@@ -135,7 +134,6 @@ private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscriptionName = builder.subscriptionName;
- cachedSubscriptionNameString = subscriptionName.toString();
Preconditions.checkArgument(
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
@@ -204,19 +202,32 @@ public void close() throws IOException {
/**
* Constructs a new {@link Builder}.
*
- * Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link - * Subscriber}. - * * @param subscription Cloud Pub/Sub subscription to bind the subscriber to * @param receiver an implementation of {@link MessageReceiver} used to process the received * messages */ public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) { + return newBuilder(subscription.toString(), receiver); + } + + /** + * Constructs a new {@link Builder}. + * + * @param subscription Cloud Pub/Sub subscription to bind the subscriber to + * @param receiver an implementation of {@link MessageReceiver} used to process the received + * messages + */ + public static Builder newBuilder(String subscription, MessageReceiver receiver) { return new Builder(subscription, receiver); } /** Subscription which the subscriber is subscribed to. */ public ProjectSubscriptionName getSubscriptionName() { + return ProjectSubscriptionName.parse(subscriptionName); + } + + /** Subscription which the subscriber is subscribed to. */ + public String getSubscriptionNameString() { return subscriptionName; } @@ -343,9 +354,7 @@ private void startPollingConnections() throws IOException { } Subscription subscriptionInfo = getSubStub.getSubscription( - GetSubscriptionRequest.newBuilder() - .setSubscription(cachedSubscriptionNameString) - .build()); + GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build()); for (Channel channel : channels) { SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel); @@ -401,7 +410,7 @@ private void startStreamingConnections() throws IOException { } streamingSubscriberConnections.add( new StreamingSubscriberConnection( - cachedSubscriptionNameString, + subscriptionName, receiver, ackExpirationPadding, maxAckExtensionPeriod, @@ -491,7 +500,7 @@ public static final class Builder { * Runtime.getRuntime().availableProcessors()) .build(); - ProjectSubscriptionName subscriptionName; + String subscriptionName; MessageReceiver receiver; Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; @@ -519,7 +528,7 @@ public static final class Builder { boolean useStreaming = true; int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE; - Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) { + Builder(String subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1495c5f32071..79e0015275f7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,12 @@ package com.google.cloud.pubsub.v1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; @@ -35,6 +41,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,17 +52,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @RunWith(JUnit4.class) public class PublisherImplTest { @@ -472,7 +471,7 @@ public void testPublisherGetters() throws Exception { @Test public void testBuilderParametersAndDefaults() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC); - assertEquals(TEST_TOPIC, builder.topicName); + assertEquals(TEST_TOPIC.toString(), builder.topicName); assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); assertEquals( Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,