Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,17 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import org.threeten.bp.Duration;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -68,6 +66,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Cloud Pub/Sub <a href="https://cloud.google.com/pubsub/docs/publisher">publisher</a>, that is
Expand All @@ -90,8 +90,7 @@
public class Publisher {
private static final Logger logger = Logger.getLogger(Publisher.class.getName());

private final ProjectTopicName topicName;
private final String cachedTopicNameString;
private final String topicName;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
Expand Down Expand Up @@ -124,7 +123,6 @@ public static long getApiMaxRequestBytes() {

private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
cachedTopicNameString = topicName.toString();

this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
Expand Down Expand Up @@ -167,7 +165,12 @@ private Publisher(Builder builder) throws IOException {
}

/** Topic which the publisher publishes to. */
public ProjectTopicName getTopicName() {
public TopicName getTopicName() {
return TopicNames.parse(topicName);
}

/** Topic which the publisher publishes to. */
public String getTopicNameString() {
return topicName;
}

Expand Down Expand Up @@ -312,7 +315,7 @@ private void publishAllOutstanding() {

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
PublishRequest.Builder publishRequest = PublishRequest.newBuilder();
publishRequest.setTopic(cachedTopicNameString);
publishRequest.setTopic(topicName);
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}
Expand Down Expand Up @@ -497,6 +500,7 @@ interface LongRandom {
* Constructs a new {@link Builder} using the given topic.
*
* <p>Example of creating a {@code Publisher}.
*
* <pre>{@code
* String projectName = "my_project";
* String topicName = "my_topic";
Expand All @@ -509,9 +513,28 @@ interface LongRandom {
* publisher.shutdown();
* }
* }</pre>
*/
public static Builder newBuilder(TopicName topicName) {
return newBuilder(topicName.toString());
}

/**
* Constructs a new {@link Builder} using the given topic.
*
* <p>Example of creating a {@code Publisher}.
*
* <pre>{@code
* String topic = "projects/my_project/topics/my_topic";
* Publisher publisher = Publisher.newBuilder(topic).build();
* try {
* // ...
* } finally {
* // When finished with the publisher, make sure to shutdown to free up resources.
* publisher.shutdown();
* }
* }</pre>
*/
public static Builder newBuilder(ProjectTopicName topicName) {
public static Builder newBuilder(String topicName) {
return new Builder(topicName);
}

Expand Down Expand Up @@ -556,7 +579,7 @@ public long nextLong(long least, long bound) {
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();

ProjectTopicName topicName;
String topicName;

// Batching options
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;
Expand All @@ -574,7 +597,7 @@ public long nextLong(long least, long bound) {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

private Builder(ProjectTopicName topic) {
private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public class Subscriber extends AbstractApiService {

private static final Logger logger = Logger.getLogger(Subscriber.class.getName());

private final ProjectSubscriptionName subscriptionName;
private final String cachedSubscriptionNameString;
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
Expand All @@ -135,7 +134,6 @@ private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
subscriptionName = builder.subscriptionName;
cachedSubscriptionNameString = subscriptionName.toString();

Preconditions.checkArgument(
builder.ackExpirationPadding.compareTo(Duration.ZERO) > 0, "padding must be positive");
Expand Down Expand Up @@ -204,19 +202,32 @@ public void close() throws IOException {
/**
* Constructs a new {@link Builder}.
*
* <p>Once {@link Builder#build} is called a gRPC stub will be created for use of the {@link
* Subscriber}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(ProjectSubscriptionName subscription, MessageReceiver receiver) {
return newBuilder(subscription.toString(), receiver);
}

/**
* Constructs a new {@link Builder}.
*
* @param subscription Cloud Pub/Sub subscription to bind the subscriber to
* @param receiver an implementation of {@link MessageReceiver} used to process the received
* messages
*/
public static Builder newBuilder(String subscription, MessageReceiver receiver) {
return new Builder(subscription, receiver);
}

/** Subscription which the subscriber is subscribed to. */
public ProjectSubscriptionName getSubscriptionName() {
return ProjectSubscriptionName.parse(subscriptionName);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}

/** Subscription which the subscriber is subscribed to. */
public String getSubscriptionNameString() {
return subscriptionName;
}

Expand Down Expand Up @@ -343,9 +354,7 @@ private void startPollingConnections() throws IOException {
}
Subscription subscriptionInfo =
getSubStub.getSubscription(
GetSubscriptionRequest.newBuilder()
.setSubscription(cachedSubscriptionNameString)
.build());
GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build());

for (Channel channel : channels) {
SubscriberFutureStub stub = SubscriberGrpc.newFutureStub(channel);
Expand Down Expand Up @@ -401,7 +410,7 @@ private void startStreamingConnections() throws IOException {
}
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
cachedSubscriptionNameString,
subscriptionName,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
Expand Down Expand Up @@ -491,7 +500,7 @@ public static final class Builder {
* Runtime.getRuntime().availableProcessors())
.build();

ProjectSubscriptionName subscriptionName;
String subscriptionName;
MessageReceiver receiver;

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
Expand Down Expand Up @@ -519,7 +528,7 @@ public static final class Builder {
boolean useStreaming = true;
int parallelPullCount = Runtime.getRuntime().availableProcessors() * CHANNELS_PER_CORE;

Builder(ProjectSubscriptionName subscriptionName, MessageReceiver receiver) {
Builder(String subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package com.google.cloud.pubsub.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -35,24 +41,17 @@
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(JUnit4.class)
public class PublisherImplTest {

Expand Down Expand Up @@ -472,7 +471,7 @@ public void testPublisherGetters() throws Exception {
@Test
public void testBuilderParametersAndDefaults() {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
assertEquals(TEST_TOPIC, builder.topicName);
assertEquals(TEST_TOPIC.toString(), builder.topicName);
assertEquals(Publisher.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider);
assertEquals(
Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,
Expand Down