From 71ad410c6f0c0ffc8b7aaaa1d8c05fd5bf265c91 Mon Sep 17 00:00:00 2001 From: Ryan Culbertson Date: Tue, 26 Apr 2016 16:16:15 -0400 Subject: [PATCH] Create pubsub subscriptions in the project specified in dataflow options PubsubIO would create subscriptions in the same project as the topic. This fails for users who have permissions to subscribe to that topic, but do not have permissions to create subscriptions in that project. Instead create the subscription in the project specified in dataflow options. If no project is specified, then fallback to the topic project. --- .../cloud/dataflow/sdk/io/PubsubIO.java | 30 ++++++++++++++----- .../cloud/dataflow/sdk/io/PubsubIOTest.java | 25 ++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index c75e501c97..1ac67eff55 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -50,6 +50,7 @@ import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import org.joda.time.Duration; @@ -200,6 +201,23 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, } } + /** + * Creates the name of a new subscription based on the specified topic and dataflow options. + * If a project is specified in the options, the subscription will be created in that project. + * Otherwise, it will be created in the same project as the topic. + * @param topic the topic to base the subscription name on + * @param options the options to base the subscription name on + * @return the name of the subscription + */ + @VisibleForTesting + protected static String createSubscriptionName(String topic, DataflowPipelineOptions options) { + String[] split = topic.split("/"); + String project = + Strings.isNullOrEmpty(options.getProject()) ? split[1] : options.getProject(); + return String.format("projects/%s/subscriptions/%s_dataflow_%d", + project, split[3], new Random().nextLong()); + } + /** * Class representing a Cloud Pub/Sub Subscription. */ @@ -760,17 +778,15 @@ private class PubsubReader extends DoFn { @Override public void processElement(ProcessContext c) throws IOException { - Pubsub pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) - .build(); + DataflowPipelineOptions options = + c.getPipelineOptions().as(DataflowPipelineOptions.class); + + Pubsub pubsubClient = Transport.newPubsubClient(options).build(); String subscription; if (getSubscription() == null) { String topic = getTopic().asPath(); - String[] split = topic.split("/"); - subscription = - "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_" - + new Random().nextLong(); + subscription = createSubscriptionName(topic, options); Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic); try { pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java index dfe5da3457..4c9c02e3be 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java @@ -25,10 +25,13 @@ import com.google.api.client.testing.http.FixedClock; import com.google.api.client.util.Clock; import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.transforms.display.DataflowDisplayDataEvaluator; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.display.DisplayDataEvaluator; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -298,4 +301,26 @@ public void testPrimitiveReadDisplayData() { assertThat("PubsubIO.Read should include the topic in its primitive display data", displayData, hasItem(hasDisplayItem("topic"))); } + + @Test + public void testCreateSubscriptionNameFromProject() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject("subscriptionProject"); + String subscriptionName = + PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options); + // the subscription should be in 'subscriptionProject' because that is specified in the options + assertThat(subscriptionName, + Matchers.startsWith("projects/subscriptionProject/subscriptions/foo_dataflow_")); + } + + @Test + public void testCreateSubscriptionNameFromTopic() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject(null); + String subscriptionName = + PubsubIO.createSubscriptionName("projects/topicProject/topics/foo", options); + // the subscription should be in 'topicProject' because no project was specified + assertThat(subscriptionName, + Matchers.startsWith("projects/topicProject/subscriptions/foo_dataflow_")); + } }