diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index c75e501c97..1ac67eff55 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -50,6 +50,7 @@ import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import org.joda.time.Duration; @@ -200,6 +201,23 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, } } + /** + * Creates the name of a new subscription based on the specified topic and dataflow options. + * If a project is specified in the options, the subscription will be created in that project. + * Otherwise, it will be created in the same project as the topic. + * @param topic the topic to base the subscription name on + * @param options the options to base the subscription name on + * @return the name of the subscription + */ + @VisibleForTesting + protected static String createSubscriptionName(String topic, DataflowPipelineOptions options) { + String[] split = topic.split("/"); + String project = + Strings.isNullOrEmpty(options.getProject()) ? split[1] : options.getProject(); + return String.format("projects/%s/subscriptions/%s_dataflow_%d", + project, split[3], new Random().nextLong()); + } + /** * Class representing a Cloud Pub/Sub Subscription. */ @@ -760,17 +778,15 @@ private class PubsubReader extends DoFn { @Override public void processElement(ProcessContext c) throws IOException { - Pubsub pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) - .build(); + DataflowPipelineOptions options = + c.getPipelineOptions().as(DataflowPipelineOptions.class); + + Pubsub pubsubClient = Transport.newPubsubClient(options).build(); String subscription; if (getSubscription() == null) { String topic = getTopic().asPath(); - String[] split = topic.split("/"); - subscription = - "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_" - + new Random().nextLong(); + subscription = createSubscriptionName(topic, options); Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic); try { pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java index dfe5da3457..4c9c02e3be 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java @@ -25,10 +25,13 @@ import com.google.api.client.testing.http.FixedClock; import com.google.api.client.util.Clock; import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.transforms.display.DataflowDisplayDataEvaluator; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.display.DisplayDataEvaluator; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -298,4 +301,26 @@ public void testPrimitiveReadDisplayData() { assertThat("PubsubIO.Read should include the topic in its primitive display data", displayData, hasItem(hasDisplayItem("topic"))); } + + @Test + public void testCreateSubscriptionNameFromProject() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject("subscriptionProject"); + String subscriptionName = + PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options); + // the subscription should be in 'subscriptionProject' because that is specified in the options + assertThat(subscriptionName, + Matchers.startsWith("projects/subscriptionProject/subscriptions/foo_dataflow_")); + } + + @Test + public void testCreateSubscriptionNameFromTopic() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject(null); + String subscriptionName = + PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options); + // the subscription should be in 'topicProject' because no project was specified + assertThat(subscriptionName, + Matchers.startsWith("projects/topicProject/subscriptions/foo_dataflow_")); + } }