From 558c5ad33e0384c0f5e9bc3ac36f6c1fde5e6a00 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 25 Mar 2016 16:09:18 -0700 Subject: [PATCH 01/11] [BEAM-151] Break out Dataflow runner dependency to separate test file This allows for moving the Dataflow specific portion of the test to the Dataflow runner maven module. --- .../transforms/DataflowGroupByKeyTest.java | 110 ++++++++++++++++++ .../sdk/transforms/GroupByKeyTest.java | 62 ---------- 2 files changed, 110 insertions(+), 62 deletions(-) create mode 100644 sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java new file mode 100644 index 000000000000..b05e7a264efa --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class DataflowGroupByKeyTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} + * is not expanded. This is used for verifying that even without expansion the proper errors show + * up. + */ + private Pipeline createTestServiceRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + @Test + public void testInvalidWindowsService() { + Pipeline p = createTestServiceRunner(); + + List> ungroupedPairs = Arrays.asList(); + + PCollection> input = + p.apply(Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.>into( + Sessions.withGapDuration(Duration.standardMinutes(1)))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("GroupByKey must have a valid Window merge function"); + input + .apply("GroupByKey", GroupByKey.create()) + .apply("GroupByKeyAgain", GroupByKey.>create()); + } + + @Test + public void testGroupByKeyServiceUnbounded() { + Pipeline p = createTestServiceRunner(); + + PCollection> input = + p.apply( + new PTransform>>() { + @Override + public PCollection> apply(PBegin input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED) + .setTypeDescriptorInternal(new TypeDescriptor>() {}); + } + }); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " + + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); + + input.apply("GroupByKey", GroupByKey.create()); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java index 75eb92fcb436..1f9d919e9f6e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java @@ -27,10 +27,8 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.MapCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -40,7 +38,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.NoopPathValidator; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -240,21 +237,6 @@ public void testWindowFnInvalidation() { Duration.standardMinutes(1))))); } - /** - * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} - * is not expanded. This is used for verifying that even without expansion the proper errors show - * up. - */ - private Pipeline createTestServiceRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - private Pipeline createTestDirectRunner() { DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); options.setRunner(DirectPipelineRunner.class); @@ -280,25 +262,6 @@ public void testInvalidWindowsDirect() { .apply("GroupByKeyAgain", GroupByKey.>create()); } - @Test - public void testInvalidWindowsService() { - Pipeline p = createTestServiceRunner(); - - List> ungroupedPairs = Arrays.asList(); - - PCollection> input = - p.apply(Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.>into( - Sessions.withGapDuration(Duration.standardMinutes(1)))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("GroupByKey must have a valid Window merge function"); - input - .apply("GroupByKey", GroupByKey.create()) - .apply("GroupByKeyAgain", GroupByKey.>create()); - } - @Test public void testRemerge() { Pipeline p = TestPipeline.create(); @@ -349,31 +312,6 @@ public PCollection> apply(PBegin input) { input.apply("GroupByKey", GroupByKey.create()); } - @Test - public void testGroupByKeyServiceUnbounded() { - Pipeline p = createTestServiceRunner(); - - PCollection> input = - p.apply( - new PTransform>>() { - @Override - public PCollection> apply(PBegin input) { - return PCollection.>createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.UNBOUNDED) - .setTypeDescriptorInternal(new TypeDescriptor>() {}); - } - }); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " - + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); - - input.apply("GroupByKey", GroupByKey.create()); - } - /** * Tests that when two elements are combined via a GroupByKey their output timestamp agrees * with the windowing function customized to actually be the same as the default, the earlier of From e60e42049445b54f23e33364bb9d0828c5660b8e Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 25 Mar 2016 16:19:54 -0700 Subject: [PATCH 02/11] [BEAM-151] Break out Dataflow runner dependency to separate test file This allows for moving the Dataflow specific portion of the testing to the Dataflow runner maven module. --- .../sdk/transforms/DataflowViewTest.java | 205 ++++++++++++++++++ .../dataflow/sdk/transforms/ViewTest.java | 126 ----------- 2 files changed, 205 insertions(+), 126 deletions(-) create mode 100644 sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java new file mode 100644 index 000000000000..b67bfcf3de21 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java @@ -0,0 +1,205 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class DataflowViewTest { + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + private Pipeline createTestBatchRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + private Pipeline createTestStreamingRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setStreaming(true); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + private void testViewUnbounded( + Pipeline pipeline, + PTransform>, ? extends PCollectionView> view) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to create a side-input view from input"); + thrown.expectCause( + ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection"))); + pipeline + .apply( + new PTransform>>() { + @Override + public PCollection> apply(PBegin input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED) + .setTypeDescriptorInternal(new TypeDescriptor>() {}); + } + }) + .apply(view); + } + + private void testViewNonmerging( + Pipeline pipeline, + PTransform>, ? extends PCollectionView> view) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to create a side-input view from input"); + thrown.expectCause( + ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey"))); + pipeline.apply(Create.>of(KV.of("hello", 5))) + .apply(Window.>into(new InvalidWindows<>( + "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1))))) + .apply(view); + } + + @Test + public void testViewUnboundedAsSingletonBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asSingleton()); + } + + @Test + public void testViewUnboundedAsSingletonStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asSingleton()); + } + + @Test + public void testViewUnboundedAsIterableBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asIterable()); + } + + @Test + public void testViewUnboundedAsIterableStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asIterable()); + } + + @Test + public void testViewUnboundedAsListBatch() { + testViewUnbounded(createTestBatchRunner(), View.>asList()); + } + + @Test + public void testViewUnboundedAsListStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.>asList()); + } + + @Test + public void testViewUnboundedAsMapBatch() { + testViewUnbounded(createTestBatchRunner(), View.asMap()); + } + + @Test + public void testViewUnboundedAsMapStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.asMap()); + } + + @Test + public void testViewUnboundedAsMultimapBatch() { + testViewUnbounded(createTestBatchRunner(), View.asMultimap()); + } + + @Test + public void testViewUnboundedAsMultimapStreaming() { + testViewUnbounded(createTestStreamingRunner(), View.asMultimap()); + } + + @Test + public void testViewNonmergingAsSingletonBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asSingleton()); + } + + @Test + public void testViewNonmergingAsSingletonStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asSingleton()); + } + + @Test + public void testViewNonmergingAsIterableBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asIterable()); + } + + @Test + public void testViewNonmergingAsIterableStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asIterable()); + } + + @Test + public void testViewNonmergingAsListBatch() { + testViewNonmerging(createTestBatchRunner(), View.>asList()); + } + + @Test + public void testViewNonmergingAsListStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.>asList()); + } + + @Test + public void testViewNonmergingAsMapBatch() { + testViewNonmerging(createTestBatchRunner(), View.asMap()); + } + + @Test + public void testViewNonmergingAsMapStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.asMap()); + } + + @Test + public void testViewNonmergingAsMultimapBatch() { + testViewNonmerging(createTestBatchRunner(), View.asMultimap()); + } + + @Test + public void testViewNonmergingAsMultimapStreaming() { + testViewNonmerging(createTestStreamingRunner(), View.asMultimap()); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java index 145956961f1f..0d2ad0144fd6 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java @@ -32,10 +32,8 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -44,7 +42,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.NoopPathValidator; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -1332,27 +1329,6 @@ public void testViewGetName() { assertEquals("View.AsMultimap", View.asMultimap().getName()); } - private Pipeline createTestBatchRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - - private Pipeline createTestStreamingRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setStreaming(true); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - private Pipeline createTestDirectRunner() { DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); options.setRunner(DirectPipelineRunner.class); @@ -1394,153 +1370,51 @@ private void testViewNonmerging( .apply(view); } - @Test - public void testViewUnboundedAsSingletonBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asSingleton()); - } - - @Test - public void testViewUnboundedAsSingletonStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asSingleton()); - } - @Test public void testViewUnboundedAsSingletonDirect() { testViewUnbounded(createTestDirectRunner(), View.>asSingleton()); } - @Test - public void testViewUnboundedAsIterableBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asIterable()); - } - - @Test - public void testViewUnboundedAsIterableStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asIterable()); - } - @Test public void testViewUnboundedAsIterableDirect() { testViewUnbounded(createTestDirectRunner(), View.>asIterable()); } - @Test - public void testViewUnboundedAsListBatch() { - testViewUnbounded(createTestBatchRunner(), View.>asList()); - } - - @Test - public void testViewUnboundedAsListStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.>asList()); - } - @Test public void testViewUnboundedAsListDirect() { testViewUnbounded(createTestDirectRunner(), View.>asList()); } - @Test - public void testViewUnboundedAsMapBatch() { - testViewUnbounded(createTestBatchRunner(), View.asMap()); - } - - @Test - public void testViewUnboundedAsMapStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.asMap()); - } - @Test public void testViewUnboundedAsMapDirect() { testViewUnbounded(createTestDirectRunner(), View.asMap()); } - - @Test - public void testViewUnboundedAsMultimapBatch() { - testViewUnbounded(createTestBatchRunner(), View.asMultimap()); - } - - @Test - public void testViewUnboundedAsMultimapStreaming() { - testViewUnbounded(createTestStreamingRunner(), View.asMultimap()); - } - @Test public void testViewUnboundedAsMultimapDirect() { testViewUnbounded(createTestDirectRunner(), View.asMultimap()); } - @Test - public void testViewNonmergingAsSingletonBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asSingleton()); - } - - @Test - public void testViewNonmergingAsSingletonStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asSingleton()); - } - @Test public void testViewNonmergingAsSingletonDirect() { testViewNonmerging(createTestDirectRunner(), View.>asSingleton()); } - @Test - public void testViewNonmergingAsIterableBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asIterable()); - } - - @Test - public void testViewNonmergingAsIterableStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asIterable()); - } - @Test public void testViewNonmergingAsIterableDirect() { testViewNonmerging(createTestDirectRunner(), View.>asIterable()); } - @Test - public void testViewNonmergingAsListBatch() { - testViewNonmerging(createTestBatchRunner(), View.>asList()); - } - - @Test - public void testViewNonmergingAsListStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.>asList()); - } - @Test public void testViewNonmergingAsListDirect() { testViewNonmerging(createTestDirectRunner(), View.>asList()); } - @Test - public void testViewNonmergingAsMapBatch() { - testViewNonmerging(createTestBatchRunner(), View.asMap()); - } - - @Test - public void testViewNonmergingAsMapStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.asMap()); - } - @Test public void testViewNonmergingAsMapDirect() { testViewNonmerging(createTestDirectRunner(), View.asMap()); } - - @Test - public void testViewNonmergingAsMultimapBatch() { - testViewNonmerging(createTestBatchRunner(), View.asMultimap()); - } - - @Test - public void testViewNonmergingAsMultimapStreaming() { - testViewNonmerging(createTestStreamingRunner(), View.asMultimap()); - } - @Test public void testViewNonmergingAsMultimapDirect() { testViewNonmerging(createTestDirectRunner(), View.asMultimap()); From a8a341946e17466b8faf72981c0da70fda5702b6 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 25 Mar 2016 16:28:36 -0700 Subject: [PATCH 03/11] [BEAM-151] Break out Dataflow runner specific tests to separate file This prevents moving the Dataflow runner code to its own separate maven module. --- .../dataflow/sdk/io/DataflowTextIOTest.java | 117 ++++++++++++++++++ .../cloud/dataflow/sdk/io/TextIOTest.java | 79 ------------ 2 files changed, 117 insertions(+), 79 deletions(-) create mode 100644 sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java new file mode 100644 index 000000000000..8c0d6d292454 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.io; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.TestCredential; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.common.collect.ImmutableList; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** + * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms. + */ +@RunWith(JUnit4.class) +public class DataflowTextIOTest { + + private TestDataflowPipelineOptions buildTestPipelineOptions() { + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setGcpCredential(new TestCredential()); + return options; + } + + private GcsUtil buildMockGcsUtil() throws IOException { + GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); + + // Any request to open gets a new bogus channel + Mockito + .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) + .then(new Answer() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + // Any request for expansion returns a list containing the original GcsPath + // This is required to pass validation that occurs in TextIO during apply() + Mockito + .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) + .then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + + return mockGcsUtil; + } + + /** + * This tests a few corner cases that should not crash. + */ + @Test + public void testGoodWildcards() throws Exception { + TestDataflowPipelineOptions options = buildTestPipelineOptions(); + options.setGcsUtil(buildMockGcsUtil()); + + Pipeline pipeline = Pipeline.create(options); + + applyRead(pipeline, "gs://bucket/foo"); + applyRead(pipeline, "gs://bucket/foo/"); + applyRead(pipeline, "gs://bucket/foo/*"); + applyRead(pipeline, "gs://bucket/foo/?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]"); + applyRead(pipeline, "gs://bucket/foo/*baz*"); + applyRead(pipeline, "gs://bucket/foo/*baz?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); + applyRead(pipeline, "gs://bucket/foo/baz/*"); + applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); + applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); + applyRead(pipeline, "gs://bucket/foo*/baz"); + applyRead(pipeline, "gs://bucket/foo?/baz"); + applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); + + // Check that running doesn't fail. + pipeline.run(); + } + + private void applyRead(Pipeline pipeline, String path) { + pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); + } +} + diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 0a8e3811085f..53eebe49be9b 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -36,14 +36,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; -import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.CoderUtils; -import com.google.cloud.dataflow.sdk.util.GcsUtil; -import com.google.cloud.dataflow.sdk.util.TestCredential; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.common.collect.ImmutableList; @@ -54,9 +50,6 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.BufferedReader; import java.io.File; @@ -64,11 +57,8 @@ import java.io.FileReader; import java.io.IOException; import java.io.PrintStream; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -83,42 +73,6 @@ public class TextIOTest { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - private GcsUtil buildMockGcsUtil() throws IOException { - GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); - - // Any request to open gets a new bogus channel - Mockito - .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) - .then(new Answer() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - // Any request for expansion returns a list containing the original GcsPath - // This is required to pass validation that occurs in TextIO during apply() - Mockito - .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) - .then(new Answer>() { - @Override - public List answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - - return mockGcsUtil; - } - - private TestDataflowPipelineOptions buildTestPipelineOptions() { - TestDataflowPipelineOptions options = - PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); - options.setGcpCredential(new TestCredential()); - return options; - } - void runTestRead(T[] expected, Coder coder) throws Exception { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); @@ -301,39 +255,6 @@ public void testUnsupportedFilePattern() throws IOException { input.apply(TextIO.Write.to(filename)); } - /** - * This tests a few corner cases that should not crash. - */ - @Test - public void testGoodWildcards() throws Exception { - TestDataflowPipelineOptions options = buildTestPipelineOptions(); - options.setGcsUtil(buildMockGcsUtil()); - - Pipeline pipeline = Pipeline.create(options); - - applyRead(pipeline, "gs://bucket/foo"); - applyRead(pipeline, "gs://bucket/foo/"); - applyRead(pipeline, "gs://bucket/foo/*"); - applyRead(pipeline, "gs://bucket/foo/?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]"); - applyRead(pipeline, "gs://bucket/foo/*baz*"); - applyRead(pipeline, "gs://bucket/foo/*baz?"); - applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); - applyRead(pipeline, "gs://bucket/foo/baz/*"); - applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); - applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); - applyRead(pipeline, "gs://bucket/foo*/baz"); - applyRead(pipeline, "gs://bucket/foo?/baz"); - applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); - - // Check that running doesn't fail. - pipeline.run(); - } - - private void applyRead(Pipeline pipeline, String path) { - pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); - } - /** * Recursive wildcards are not supported. * This tests "**". From 7b66b193ca36e6e77124eef3fd20d3e78785bfc6 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 25 Mar 2016 16:29:48 -0700 Subject: [PATCH 04/11] [BEAM-151] Fixup of comment in test file --- .../com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java index 8c0d6d292454..f6d68ec41b46 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of From 7a583cb21cf830e47ced00b19e9efc9d17f133bb Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 10:02:01 -0700 Subject: [PATCH 05/11] [BEAM-151] Migrate options relevant to pubsub to its own options interface This prevents moving DataflowPipelineOptions to the Dataflow runner maven module. --- .../cloud/dataflow/sdk/io/PubsubIO.java | 6 ++-- .../options/DataflowPipelineDebugOptions.java | 8 ----- .../sdk/options/DataflowPipelineOptions.java | 2 +- .../dataflow/sdk/options/PubsubOptions.java | 35 +++++++++++++++++++ .../cloud/dataflow/sdk/util/Transport.java | 3 +- 5 files changed, 41 insertions(+), 13 deletions(-) create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index 653b31f059e4..63b7756abdde 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -32,7 +32,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; @@ -722,7 +722,7 @@ private class PubsubReader extends DoFn { @Override public void processElement(ProcessContext c) throws IOException { Pubsub pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) + Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) .build(); String subscription; @@ -998,7 +998,7 @@ private class PubsubWriter extends DoFn { public void startBundle(Context c) { this.output = new ArrayList<>(); this.pubsubClient = - Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class)) + Transport.newPubsubClient(c.getPipelineOptions().as(PubsubOptions.class)) .build(); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index cadc011d135d..8a229818c0e8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -162,14 +162,6 @@ public Dataflow create(PipelineOptions options) { } } - /** - * Root URL for use with the Pubsub API. - */ - @Description("Root URL for use with the Pubsub API") - @Default.String("https://pubsub.googleapis.com") - String getPubsubRootUrl(); - void setPubsubRootUrl(String value); - /** * Whether to update the currently running pipeline with the same name as this one. * diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java index 6794032a97ef..35874c4b6767 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java @@ -32,7 +32,7 @@ public interface DataflowPipelineOptions extends PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions, DataflowPipelineWorkerPoolOptions, BigQueryOptions, GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions, - DataflowProfilingOptions { + DataflowProfilingOptions, PubsubOptions { @Description("Project id. Required when running a Dataflow in the cloud. " + "See https://cloud.google.com/storage/docs/projects for further details.") diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java new file mode 100644 index 000000000000..51cf62cdbbd3 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.options; + +/** + * Properties that can be set when using Pubsub with the Dataflow SDK. + */ +@Description("Options that are used to configure BigQuery. See " + + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") +public interface PubsubOptions extends ApplicationNameOptions, GcpOptions, + PipelineOptions, StreamingOptions { + + /** + * Root URL for use with the Pubsub API. + */ + @Description("Root URL for use with the Pubsub API") + @Default.String("https://pubsub.googleapis.com") + @Hidden + String getPubsubRootUrl(); + void setPubsubRootUrl(String value); +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 15fe2863395a..8376dd66a065 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -31,6 +31,7 @@ import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; +import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList; @@ -114,7 +115,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. */ public static Pubsub.Builder - newPubsubClient(DataflowPipelineOptions options) { + newPubsubClient(PubsubOptions options) { return new Pubsub.Builder(getTransport(), getJsonFactory(), chainHttpRequestInitializer( options.getGcpCredential(), From aa405d2591105172c1ba9b84060d17c262ba6179 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 13:09:33 -0700 Subject: [PATCH 06/11] [BEAM-151] Break out Dataflow transport creation to another file This prevents moving DataflowPipelineOptions into a Dataflow runner maven module. --- .../options/DataflowPipelineDebugOptions.java | 5 +- .../sdk/runners/DataflowPipelineRunner.java | 6 +- .../dataflow/sdk/util/DataflowTransport.java | 111 ++++++++++++++++++ .../cloud/dataflow/sdk/util/Transport.java | 46 -------- .../options/GoogleApiDebugOptionsTest.java | 93 ++++++++------- 5 files changed, 163 insertions(+), 98 deletions(-) create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index 8a229818c0e8..50174cdfb586 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -19,11 +19,11 @@ import com.google.api.services.dataflow.Dataflow; import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.GcsStager; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.Stager; -import com.google.cloud.dataflow.sdk.util.Transport; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -158,7 +158,8 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { public static class DataflowClientFactory implements DefaultValueFactory { @Override public Dataflow create(PipelineOptions options) { - return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build(); + return DataflowTransport.newDataflowClient( + options.as(DataflowPipelineOptions.class)).build(); } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 032e93d7e59c..8c0abf993eb8 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -98,6 +98,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MonitoringUtil; @@ -106,7 +107,6 @@ import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; -import com.google.cloud.dataflow.sdk.util.Transport; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -443,7 +443,7 @@ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniqu throw new RuntimeException("Should not specify the debuggee"); } - Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build(); Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); options.setDebuggee(debuggee); @@ -599,7 +599,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(options.getProject(), jobResult.getId(), - Transport.newRawDataflowClient(options).build(), aggregatorTransforms); + DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java new file mode 100644 index 000000000000..8de358cf6ba3 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ + +package com.google.cloud.dataflow.sdk.util; + +import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; +import static com.google.cloud.dataflow.sdk.util.Transport.getTransport; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Helpers for cloud communication. + */ +public class DataflowTransport { + + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a Google Cloud Dataflow client builder. + */ + public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { + String servicePath = options.getDataflowEndpoint(); + ApiComponents components; + if (servicePath.contains("://")) { + components = apiComponentsFromUrl(servicePath); + } else { + components = new ApiComponents(options.getApiRootUrl(), servicePath); + } + + return new Dataflow.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setRootUrl(components.rootUrl) + .setServicePath(components.servicePath) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { + return new Clouddebugger.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Dataflow client that does not automatically retry failed + * requests. + */ + public static Dataflow.Builder + newRawDataflowClient(DataflowPipelineOptions options) { + return newDataflowClient(options) + .setHttpRequestInitializer(options.getGcpCredential()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 8376dd66a065..195313785b8c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -23,13 +23,10 @@ import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -126,49 +123,6 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); } - /** - * Returns a Google Cloud Dataflow client builder. - */ - public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { - String servicePath = options.getDataflowEndpoint(); - ApiComponents components; - if (servicePath.contains("://")) { - components = apiComponentsFromUrl(servicePath); - } else { - components = new ApiComponents(options.getApiRootUrl(), servicePath); - } - - return new Dataflow.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setRootUrl(components.rootUrl) - .setServicePath(components.servicePath) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { - return new Clouddebugger.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - /** * Returns a Cloud Storage client builder. * diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java index 3a16cf5dee25..3914bac94978 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java @@ -21,8 +21,7 @@ import static org.junit.Assert.assertNull; import com.google.api.services.bigquery.Bigquery.Datasets.Delete; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Create; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.Transport; @@ -36,104 +35,104 @@ /** Tests for {@link GoogleApiDebugOptions}. */ @RunWith(JUnit4.class) public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + @Test public void testWhenTracingMatches() throws Exception { - String[] args = - new String[] {"--googleApiTrace={\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", request.get("$trace")); } @Test public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Projects.Jobs.Create\":\"testToken\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); assertNull(request.get("$trace")); } @Test public void testWithMultipleTraces() throws Exception { - String[] args = new String[] { - "--googleApiTrace={\"Projects.Jobs.Create\":\"CreateTraceDestination\"," - + "\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("CreateTraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); } @Test - public void testMatchingAllDataflowCalls() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Dataflow\":\"TraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("TraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); } @Test public void testMatchingAgainstClient() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build(), "TraceDestination")); + Transport.newStorageClient(options).build(), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Delete deleteRequest = Transport.newBigQueryClient(options).build().datasets() - .delete("testProjectId", "testDatasetId"); + Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) + .build().datasets().delete("testProjectId", "testDatasetId"); assertNull(deleteRequest.get("$trace")); } @Test public void testMatchingAgainstRequestType() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build().projects().jobs() - .get("aProjectId", "aJobId"), "TraceDestination")); + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertNull(createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); } @Test From e88059f46183a3cd67a9da7fd1e1d493b86724f8 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 13:11:28 -0700 Subject: [PATCH 07/11] [BEAM-151] Remove dependence on num workers in DatastoreIO This prevents moving DataflowPipelineOptions into a separate maven module. --- .../google/cloud/dataflow/sdk/io/DatastoreIO.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java index f618bc9d63bc..619b17d28eac 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java @@ -53,7 +53,6 @@ import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation; import com.google.cloud.dataflow.sdk.io.Sink.Writer; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; @@ -306,18 +305,7 @@ public List splitIntoBundles(long desiredBundleSizeBytes, PipelineOption numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes); } catch (Exception e) { // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible. - - // 1. Try Dataflow's numWorkers, which will be 0 for other workers. - DataflowPipelineWorkerPoolOptions poolOptions = - options.as(DataflowPipelineWorkerPoolOptions.class); - if (poolOptions.getNumWorkers() > 0) { - LOG.warn("Estimated size of unavailable, using the number of workers {}", - poolOptions.getNumWorkers(), e); - numSplits = poolOptions.getNumWorkers(); - } else { - // 2. Default to 12 in the unknown case. - numSplits = 12; - } + numSplits = 12; } // If the desiredBundleSize or number of workers results in 1 split, simply return From d4fa5f7abce953272c31da355b0c00c0be2bdd09 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 13:19:53 -0700 Subject: [PATCH 08/11] [BEAM-151] Remove references to DataflowPipelineOptions Removed DatastoreIO dependence on numWorkers from DataflowPipelineWorkerHarnessOptions. This prevented moving DataflowPipelineOptions to a separate maven module. --- .../cloud/dataflow/sdk/io/PubsubIO.java | 8 +-- .../google/cloud/dataflow/sdk/io/TextIO.java | 10 ++- .../dataflow/sdk/io/DatastoreIOTest.java | 66 ++++--------------- 3 files changed, 18 insertions(+), 66 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index 63b7756abdde..8ce424373dad 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -33,7 +33,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.options.PubsubOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -393,10 +392,9 @@ public String asPath() { * the stream. * *

When running with a {@link PipelineRunner} that only supports bounded - * {@link PCollection PCollections} (such as {@link DirectPipelineRunner} or - * {@link DataflowPipelineRunner} without {@code --streaming}), only a bounded portion of the - * input Pub/Sub stream can be processed. As such, either {@link Bound#maxNumRecords(int)} or - * {@link Bound#maxReadTime(Duration)} must be set. + * {@link PCollection PCollections} (such as {@link DirectPipelineRunner}), + * only a bounded portion of the input Pub/Sub stream can be processed. As such, either + * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set. */ public static class Read { /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index d342f250b2e8..e629e309ca75 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; @@ -112,11 +111,10 @@ *

Permissions

*

When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files * on your local drive and remote text files on Google Cloud Storage that you have access to using - * your {@code gcloud} credentials. When running in the Dataflow service using - * {@link DataflowPipelineRunner}, the pipeline can only read and write files from GCS. For more - * information about permissions, see the Cloud Dataflow documentation on - * Security and - * Permissions. + * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only + * read and write files from GCS. For more information about permissions, see the Cloud Dataflow + * documentation on Security + * and Permissions. */ public class TextIO { /** The default coder, which returns each line of the input file as a string. */ diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java index 4cc3ace1b578..629816680ace 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java @@ -50,7 +50,7 @@ import com.google.api.services.datastore.client.QuerySplitter; import com.google.cloud.dataflow.sdk.io.DatastoreIO.DatastoreReader; import com.google.cloud.dataflow.sdk.io.DatastoreIO.DatastoreWriter; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; @@ -68,14 +68,11 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; -import javax.annotation.Nullable; - /** * Tests for {@link DatastoreIO}. */ @@ -111,12 +108,9 @@ public void setUp() { /** * Helper function to create a test {@code DataflowPipelineOptions}. */ - static final DataflowPipelineOptions testPipelineOptions(@Nullable Integer numWorkers) { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + static final GcpOptions testPipelineOptions() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); options.setGcpCredential(new TestCredential()); - if (numWorkers != null) { - options.setNumWorkers(numWorkers); - } return options; } @@ -216,13 +210,13 @@ public void testSinkValidationFailsWithNoDataset() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("Dataset"); - sink.validate(testPipelineOptions(null)); + sink.validate(testPipelineOptions()); } @Test public void testSinkValidationSucceedsWithDataset() throws Exception { DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET); - sink.validate(testPipelineOptions(null)); + sink.validate(testPipelineOptions()); } @Test @@ -253,7 +247,7 @@ public void testQuerySplitBasic() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(8 * 1024L); - List bundles = io.splitIntoBundles(1024, testPipelineOptions(null)); + List bundles = io.splitIntoBundles(1024, testPipelineOptions()); assertEquals(8, bundles.size()); for (int i = 0; i < 8; ++i) { DatastoreIO.Source bundle = bundles.get(i); @@ -273,7 +267,7 @@ public void testSourceWithNamespace() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(8 * 1024L); - io.splitIntoBundles(1024, testPipelineOptions(null)); + io.splitIntoBundles(1024, testPipelineOptions()); PartitionId partition = PartitionId.newBuilder().setNamespace(NAMESPACE).build(); verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class)); @@ -299,7 +293,7 @@ public void testQuerySplitWithZeroSize() throws Exception { .withMockSplitter(splitter) .withMockEstimateSizeBytes(0L); - List bundles = io.splitIntoBundles(1024, testPipelineOptions(null)); + List bundles = io.splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); verify(splitter, never()) .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); @@ -327,7 +321,7 @@ public void testQueryDoesNotSplitWithLimitSet() throws Exception { initialSource .withQuery(query) .withMockSplitter(splitter) - .splitIntoBundles(1024, testPipelineOptions(null)); + .splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); assertEquals(query, bundles.get(0).getQuery()); @@ -355,7 +349,7 @@ public void testQuerySplitterThrows() throws Exception { .withQuery(query) .withMockSplitter(splitter) .withMockEstimateSizeBytes(10240L) - .splitIntoBundles(1024, testPipelineOptions(null)); + .splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); assertEquals(query, bundles.get(0).getQuery()); @@ -370,44 +364,6 @@ public void testQuerySplitSizeUnavailable() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); Query query = Query.newBuilder().addKind(mykind).build(); - List mockSplits = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - mockSplits.add( - Query.newBuilder() - .addKind(mykind) - .setFilter( - DatastoreHelper.makeFilter("foo", PropertyFilter.Operator.EQUAL, - Value.newBuilder().setIntegerValue(i).build())) - .build()); - } - - QuerySplitter splitter = mock(QuerySplitter.class); - when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), any(Datastore.class))) - .thenReturn(mockSplits); - - DatastoreIO.Source io = initialSource - .withQuery(query) - .withMockSplitter(splitter) - .withMockEstimateSizeBytes(8 * 1024L); - - DatastoreIO.Source spiedIo = spy(io); - when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class))).thenThrow(new IOException()); - - List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions(2)); - assertEquals(2, bundles.size()); - for (int i = 0; i < 2; ++i) { - DatastoreIO.Source bundle = bundles.get(i); - Query bundleQuery = bundle.getQuery(); - assertEquals("mykind", bundleQuery.getKind(0).getName()); - assertEquals(i, bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue()); - } - } - - @Test - public void testQuerySplitNoWorkers() throws Exception { - KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); - Query query = Query.newBuilder().addKind(mykind).build(); - List mockSplits = Lists.newArrayList(Query.newBuilder().addKind(mykind).build()); QuerySplitter splitter = mock(QuerySplitter.class); @@ -423,7 +379,7 @@ public void testQuerySplitNoWorkers() throws Exception { when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class))) .thenThrow(new NoSuchElementException()); - List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions(0)); + List bundles = spiedIo.splitIntoBundles(1024, testPipelineOptions()); assertEquals(1, bundles.size()); verify(splitter, never()) .getSplits(any(Query.class), any(PartitionId.class), eq(1), any(Datastore.class)); From 52a4ecbeb4f2aab3db4023e10ddbced2ac70b690 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 13:26:19 -0700 Subject: [PATCH 09/11] [BEAM-151] Remove references to Dataflow runners This prevents moving DataflowPipelineRunner and subclasses to runner specific maven module. --- .../sdk/options/PipelineOptionsFactoryTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java index 8b5dff539745..c3ed8ec1ed34 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java @@ -28,8 +28,6 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; @@ -828,20 +826,21 @@ public void testSetASingularAttributeUsingAListIsIgnoredWithoutStrictParsing() { @Test public void testSettingRunner() { - String[] args = new String[] {"--runner=BlockingDataflowPipelineRunner"}; + String[] args = new String[] {"--runner=DirectPipelineRunner"}; PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(BlockingDataflowPipelineRunner.class, options.getRunner()); + assertEquals(DirectPipelineRunner.class, options.getRunner()); } @Test public void testSettingRunnerFullName() { String[] args = - new String[] {String.format("--runner=%s", DataflowPipelineRunner.class.getName())}; + new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())}; PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create(); - assertEquals(opts.getRunner(), DataflowPipelineRunner.class); + assertEquals(opts.getRunner(), DirectPipelineRunner.class); } + @Test public void testSettingUnknownRunner() { String[] args = new String[] {"--runner=UnknownRunner"}; From ab8e54d383716cc82d49b318f0c1012157a68607 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 28 Mar 2016 14:16:34 -0700 Subject: [PATCH 10/11] [BEAM-151] Remove dependence on DataflowPipelineOptions This prevents moving DataflowPipelineOptions to its own runner specifc maven module. --- .../cloud/dataflow/sdk/util/Transport.java | 16 ++-------- .../sdk/testing/TestPipelineTest.java | 32 ++++--------------- 2 files changed, 10 insertions(+), 38 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 195313785b8c..1490e24b9453 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -26,7 +26,6 @@ import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -89,10 +88,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a BigQuery client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */ public static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { @@ -106,10 +102,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a Pubsub client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a Pubsub client builder using the specified {@link PubsubOptions}. */ public static Pubsub.Builder newPubsubClient(PubsubOptions options) { @@ -124,10 +117,7 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } /** - * Returns a Cloud Storage client builder. - * - *

Note: this client's endpoint is not modified by the - * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option. + * Returns a Cloud Storage client builder using the specified {@link GcsOptions}. */ public static Storage.Builder newStorageClient(GcsOptions options) { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java index 397920a1dbb7..3996bdcf7f50 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/testing/TestPipelineTest.java @@ -22,10 +22,9 @@ import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.fasterxml.jackson.databind.ObjectMapper; @@ -49,31 +48,14 @@ public void testCreationUsingDefaults() { public void testCreationOfPipelineOptions() throws Exception { ObjectMapper mapper = new ObjectMapper(); String stringOptions = mapper.writeValueAsString(new String[]{ - "--runner=DataflowPipelineRunner", - "--project=testProject", - "--apiRootUrl=testApiRootUrl", - "--dataflowEndpoint=testDataflowEndpoint", - "--tempLocation=testTempLocation", - "--serviceAccountName=testServiceAccountName", - "--serviceAccountKeyfile=testServiceAccountKeyfile", - "--zone=testZone", - "--numWorkers=1", - "--diskSizeGb=2" + "--runner=DirectPipelineRunner", + "--project=testProject" }); System.getProperties().put("dataflowOptions", stringOptions); - DataflowPipelineOptions options = - TestPipeline.testingPipelineOptions().as(DataflowPipelineOptions.class); - assertEquals(DataflowPipelineRunner.class, options.getRunner()); - assertThat(options.getJobName(), startsWith("testpipelinetest0testcreationofpipelineoptions-")); - assertEquals("testProject", options.as(GcpOptions.class).getProject()); - assertEquals("testApiRootUrl", options.getApiRootUrl()); - assertEquals("testDataflowEndpoint", options.getDataflowEndpoint()); - assertEquals("testTempLocation", options.getTempLocation()); - assertEquals("testServiceAccountName", options.getServiceAccountName()); - assertEquals( - "testServiceAccountKeyfile", options.as(GcpOptions.class).getServiceAccountKeyfile()); - assertEquals("testZone", options.getZone()); - assertEquals(2, options.getDiskSizeGb()); + GcpOptions options = + TestPipeline.testingPipelineOptions().as(GcpOptions.class); + assertEquals(DirectPipelineRunner.class, options.getRunner()); + assertEquals(options.getProject(), "testProject"); } @Test From 00d5da0af15480c65625ff9d097409e6a014a2f8 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Thu, 31 Mar 2016 09:56:48 -0700 Subject: [PATCH 11/11] [BEAM-151] Update ASF license at top of new files. Also fix minor comment. --- .../dataflow/sdk/options/PubsubOptions.java | 25 ++++++++--------- .../dataflow/sdk/util/DataflowTransport.java | 27 ++++++++++--------- .../dataflow/sdk/io/DataflowTextIOTest.java | 23 ++++++++-------- .../transforms/DataflowGroupByKeyTest.java | 23 ++++++++-------- .../sdk/transforms/DataflowViewTest.java | 23 ++++++++-------- 5 files changed, 63 insertions(+), 58 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java index 51cf62cdbbd3..deb19e909f5c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java @@ -1,23 +1,24 @@ /* - * Copyright (C) 2016 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package com.google.cloud.dataflow.sdk.options; /** - * Properties that can be set when using Pubsub with the Dataflow SDK. + * Properties that can be set when using Pubsub with the Beam SDK. */ @Description("Options that are used to configure BigQuery. See " + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java index 8de358cf6ba3..18e66549df0a 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java @@ -1,19 +1,20 @@ -/******************************************************************************* - * Copyright (C) 2015 Google Inc. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - ******************************************************************************/ - + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.google.cloud.dataflow.sdk.util; import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java index f6d68ec41b46..1bd8a85437d1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DataflowTextIOTest.java @@ -1,19 +1,20 @@ /* - * Copyright (C) 2016 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package com.google.cloud.dataflow.sdk.io; import com.google.cloud.dataflow.sdk.Pipeline; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java index b05e7a264efa..b0b011d142a9 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java @@ -1,19 +1,20 @@ /* - * Copyright (C) 2016 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package com.google.cloud.dataflow.sdk.transforms; import com.google.cloud.dataflow.sdk.Pipeline; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java index b67bfcf3de21..c2a427344318 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java @@ -1,19 +1,20 @@ /* - * Copyright (C) 2015 Google Inc. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package com.google.cloud.dataflow.sdk.transforms; import com.google.cloud.dataflow.sdk.Pipeline;