Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;

import org.joda.time.Duration;
Expand Down Expand Up @@ -200,6 +201,23 @@ private static void populateCommonDisplayData(DisplayData.Builder builder,
}
}

/**
* Creates the name of a new subscription based on the specified topic and dataflow options.
* If a project is specified in the options, the subscription will be created in that project.
* Otherwise, it will be created in the same project as the topic.
* @param topic the topic to base the subscription name on
* @param options the options to base the subscription name on
* @return the name of the subscription
*/
@VisibleForTesting
protected static String createSubscriptionName(String topic, DataflowPipelineOptions options) {
String[] split = topic.split("/");
String project =
Strings.isNullOrEmpty(options.getProject()) ? split[1] : options.getProject();
return String.format("projects/%s/subscriptions/%s_dataflow_%d",
project, split[3], new Random().nextLong());
}

/**
* Class representing a Cloud Pub/Sub Subscription.
*/
Expand Down Expand Up @@ -760,17 +778,15 @@ private class PubsubReader extends DoFn<Void, T> {

@Override
public void processElement(ProcessContext c) throws IOException {
Pubsub pubsubClient =
Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
.build();
DataflowPipelineOptions options =
c.getPipelineOptions().as(DataflowPipelineOptions.class);

Pubsub pubsubClient = Transport.newPubsubClient(options).build();

String subscription;
if (getSubscription() == null) {
String topic = getTopic().asPath();
String[] split = topic.split("/");
subscription =
"projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
+ new Random().nextLong();
subscription = createSubscriptionName(topic, options);
Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
try {
pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import com.google.api.client.testing.http.FixedClock;
import com.google.api.client.util.Clock;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.display.DataflowDisplayDataEvaluator;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayDataEvaluator;

import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -298,4 +301,26 @@ public void testPrimitiveReadDisplayData() {
assertThat("PubsubIO.Read should include the topic in its primitive display data",
displayData, hasItem(hasDisplayItem("topic")));
}

@Test
public void testCreateSubscriptionNameFromProject() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("subscriptionProject");
String subscriptionName =
PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options);
// the subscription should be in 'subscriptionProject' because that is specified in the options
assertThat(subscriptionName,
Matchers.startsWith("projects/subscriptionProject/subscriptions/foo_dataflow_"));
}

@Test
public void testCreateSubscriptionNameFromTopic() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(null);
String subscriptionName =
PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options);
// the subscription should be in 'topicProject' because no project was specified
assertThat(subscriptionName,
Matchers.startsWith("projects/topicProject/subscriptions/foo_dataflow_"));
}
}