diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index a1d48ce2e4d2..f7f1d8076f43 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -287,11 +287,21 @@ java-sdk-all + + org.apache.avro + avro + + com.google.api-client google-api-client + + com.google.apis + google-api-services-datastore-protobuf + + com.google.oauth-client google-oauth-client diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java new file mode 100644 index 000000000000..614affb30f62 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import org.apache.avro.Schema; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Set; + +/** + * {@link DataflowPipelineRunner} specific tests for {@link AvroIO} transforms. + */ +@RunWith(JUnit4.class) +public class DataflowAvroIOTest { + @Test + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + AvroIO.Write.Bound write = AvroIO.Write + .to("foo") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("AvroIO.Write should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("fileNamePattern"))); + } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + AvroIO.Read.Bound read = AvroIO.Read.from("foo.*") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("AvroIO.Read should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("filePattern"))); + } +} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java index 619da047d6d9..2b13b9c5cac6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; @@ -39,6 +38,30 @@ * Unit tests for Dataflow usage of {@link BigQueryIO} transforms. */ public class DataflowBigQueryIOTest { + @Test + public void testTableSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .from("project:dataset.tableId") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("table"))); + } + + @Test + public void testQuerySourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .fromQuery("foobar") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the query in its primitive display data", + displayData, hasItem(hasDisplayItem("query"))); + } + @Test public void testBatchSinkPrimitiveDisplayData() { DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); @@ -63,9 +86,9 @@ private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) { Set displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem(hasKey("tableSpec")))); + displayData, hasItem(hasDisplayItem("tableSpec"))); assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem(hasKey("schema")))); + displayData, hasItem(hasDisplayItem("schema"))); } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java new file mode 100644 index 000000000000..42a0b99aa2b1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.DatastoreIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; + +import com.google.api.services.datastore.DatastoreV1; + +import org.junit.Test; + +import java.util.Set; + +/** + * Unit tests for Dataflow usage of {@link DatastoreIO} transforms. + */ +public class DataflowDatastoreIOTest { + @Test + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PTransform read = DatastoreIO.readFrom( + "myDataset", DatastoreV1.Query.newBuilder().build()); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("DatastoreIO read should include the dataset in its primitive display data", + displayData, hasItem(hasDisplayItem("dataset"))); + } + + @Test + public void testSinkPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PTransform, ?> write = DatastoreIO.writeTo("myDataset"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the dataset in its primitive display data", + displayData, hasItem(hasDisplayItem("dataset"))); + } +} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java new file mode 100644 index 000000000000..4874877d73fd --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.PubsubIO; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Set; + +/** + * {@link DataflowPipelineRunner} specific tests for {@link PubsubIO} transforms. + */ +@RunWith(JUnit4.class) +public class DataflowPubsubIOTest { + @Test + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PubsubIO.Write.Bound write = PubsubIO.Write + .topic("projects/project/topics/topic"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("PubsubIO.Write should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + PubsubIO.Read.Bound read = PubsubIO.Read.topic("projects/project/topics/topic"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("PubsubIO.Read should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } +} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java index 8ff7d0e5679b..0d7c1cbff5ce 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java @@ -128,7 +128,7 @@ private void applyRead(Pipeline pipeline, String path) { } @Test - public void testPrimitiveDisplayData() { + public void testPrimitiveWriteDisplayData() { DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); TextIO.Write.Bound write = TextIO.Write.to("foobar"); @@ -137,4 +137,17 @@ public void testPrimitiveDisplayData() { assertThat("TextIO.Write should include the file prefix in its primitive display data", displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); } + + @Test + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + TextIO.Read.Bound read = TextIO.Read + .from("foobar") + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("TextIO.Read should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java index db42c771052b..0b865c3dab92 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java @@ -43,6 +43,7 @@ private DataflowDisplayDataEvaluator() {} public static DataflowPipelineOptions getDefaultOptions() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); options.setProject("foobar"); options.setTempLocation("gs://bucket/tmpLocation"); options.setFilesToStage(Lists.newArrayList()); @@ -66,6 +67,6 @@ public static DisplayDataEvaluator create() { * the {@link DataflowPipelineRunner} with the specified {@code options}. */ public static DisplayDataEvaluator create(DataflowPipelineOptions options) { - return DisplayDataEvaluator.forRunner(DataflowPipelineRunner.class, options); + return DisplayDataEvaluator.create(options); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index a5ef39f62b6e..3f22648bdc60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -802,6 +802,12 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { // Do nothing. } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("table", jsonTable)); + } } /** @@ -893,6 +899,11 @@ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("query", query)); + } private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index fa867c24bd22..6a1447775f68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -1030,6 +1030,11 @@ private void publish() throws IOException { checkState(n == output.size()); output.clear(); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Bound.this.populateDisplayData(builder); + } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index d00914c536e9..6849018d03f6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -20,7 +20,7 @@ import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -710,8 +710,8 @@ public void testBuildSinkDisplayData() { DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem(hasKey("table"))); - assertThat(displayData, hasDisplayItem(hasKey("schema"))); + assertThat(displayData, hasDisplayItem("table")); + assertThat(displayData, hasDisplayItem("schema")); assertThat(displayData, hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString())); assertThat(displayData, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 76e547f84752..542e7341d36e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; import static org.hamcrest.Matchers.containsString; @@ -353,7 +352,7 @@ public void populateDisplayData(DisplayData.Builder builder) { DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource); DisplayData gzipDisplayData = DisplayData.from(gzipSource); - assertThat(compressedSourceDisplayData, hasDisplayItem(hasKey("compressionMode"))); + assertThat(compressedSourceDisplayData, hasDisplayItem("compressionMode")); assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString())); assertThat(compressedSourceDisplayData, hasDisplayItem("source", inputSource.getClass())); assertThat(compressedSourceDisplayData, includesDisplayDataFrom(inputSource)); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 228a6ba092a1..6fc970015d3c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -834,7 +834,7 @@ public void testDisplayDataExcludesDefaultValues() { PipelineOptions options = PipelineOptionsFactory.as(HasDefaults.class); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("foo")))); + assertThat(data, not(hasDisplayItem("foo"))); } interface HasDefaults extends PipelineOptions { @@ -849,7 +849,7 @@ public void testDisplayDataExcludesValuesAccessedButNeverSet() { assertEquals("bar", options.getFoo()); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("foo")))); + assertThat(data, not(hasDisplayItem("foo"))); } @Test @@ -958,6 +958,6 @@ public void testDisplayDataExcludesJsonIgnoreOptions() { options.setValue("foobar"); DisplayData data = DisplayData.from(options); - assertThat(data, not(hasDisplayItem(hasKey("value")))); + assertThat(data, not(hasDisplayItem("value"))); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index c94c9f1fdd9a..4f00ed4beea2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -305,6 +304,6 @@ public void testDisplayData() { DisplayData maxErrorDisplayData = DisplayData.from(specifiedMaxError); assertThat(maxErrorDisplayData, hasDisplayItem("maximumEstimationError", 0.1234)); assertThat("calculated sampleSize should be included", maxErrorDisplayData, - hasDisplayItem(hasKey("sampleSize"))); + hasDisplayItem("sampleSize")); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index 67590039b916..a17e06fdda5c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -20,9 +20,9 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; @@ -40,25 +40,21 @@ public class DisplayDataEvaluator { private final PipelineOptions options; /** - * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and - * default {@link PipelineOptions}. + * Create a new {@link DisplayDataEvaluator} using {@link TestPipeline#testingPipelineOptions()}. */ - public static DisplayDataEvaluator forRunner(Class> pipelineRunner) { - return forRunner(pipelineRunner, PipelineOptionsFactory.create()); + public static DisplayDataEvaluator create() { + return create(TestPipeline.testingPipelineOptions()); } /** - * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and - * {@link PipelineOptions}. + * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineOptions}. */ - public static DisplayDataEvaluator forRunner( - Class> pipelineRunner, PipelineOptions pipelineOptions) { - return new DisplayDataEvaluator(pipelineRunner, pipelineOptions); + public static DisplayDataEvaluator create(PipelineOptions pipelineOptions) { + return new DisplayDataEvaluator(pipelineOptions); } - private DisplayDataEvaluator(Class> runner, PipelineOptions options) { + private DisplayDataEvaluator(PipelineOptions options) { this.options = options; - this.options.setRunner(runner); } /** @@ -109,7 +105,7 @@ private static class PrimitiveDisplayDataPTransformVisitor extends Pipeline.PipelineVisitor.Defaults { private final PTransform root; private final Set displayData; - private boolean shouldRecord = false; + private boolean inCompositeRoot = false; PrimitiveDisplayDataPTransformVisitor(PTransform root) { this.root = root; @@ -123,7 +119,7 @@ Set getPrimitivesDisplayData() { @Override public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { if (Objects.equals(root, node.getTransform())) { - shouldRecord = true; + inCompositeRoot = true; } return CompositeBehavior.ENTER_TRANSFORM; @@ -132,13 +128,13 @@ public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { @Override public void leaveCompositeTransform(TransformTreeNode node) { if (Objects.equals(root, node.getTransform())) { - shouldRecord = false; + inCompositeRoot = false; } } @Override public void visitPrimitiveTransform(TransformTreeNode node) { - if (shouldRecord) { + if (inCompositeRoot || Objects.equals(root, node.getTransform())) { displayData.add(DisplayData.from(node.getTransform())); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java index f24c1337a8e9..7b1dc79e6bc9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -41,7 +40,7 @@ public class DisplayDataEvaluatorTest implements Serializable { @Test - public void testDisplayDataForPrimitiveTransforms() { + public void testCompositeTransform() { PTransform, ? super POutput> myTransform = new PTransform, POutput> () { @Override @@ -65,10 +64,29 @@ public void populateDisplayData(DisplayData.Builder builder) { } }; - DisplayDataEvaluator evaluator = DisplayDataEvaluator.forRunner(DirectPipelineRunner.class); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); Set displayData = evaluator.displayDataForPrimitiveTransforms(myTransform); assertThat(displayData, not(hasItem(hasDisplayItem("compositeKey", "compositeValue")))); assertThat(displayData, hasItem(hasDisplayItem("primitiveKey", "primitiveValue"))); } + + @Test + public void testPrimitiveTransform() { + PTransform, ? super PCollection> myTransform = ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception {} + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("foo", "bar")); + } + }); + + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + Set displayData = evaluator.displayDataForPrimitiveTransforms(myTransform); + + assertThat(displayData, hasItem(hasDisplayItem("foo"))); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java index e3721b8f6ef4..4207624feb6b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchers.java @@ -56,6 +56,14 @@ protected Collection> featureValueOf(DisplayData actual) { }; } + /** + * Creates a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key. + */ + public static Matcher hasDisplayItem(String key) { + return hasDisplayItem(hasKey(key)); + } + /** * Create a matcher that matches if the examined {@link DisplayData} contains an item with the * specified key and String value. diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java index f848c5ed3082..f9f29110f242 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms.display; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; @@ -67,7 +66,7 @@ public void testHasDisplayItemDescription() { @Test public void testHasKey() { - Matcher matcher = hasDisplayItem(hasKey("foo")); + Matcher matcher = hasDisplayItem("foo"); assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar"))); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index cd5eb2d4f521..c858f32d2663 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -292,7 +292,7 @@ public void testDisplayDataExcludesDefaults() { .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); DisplayData data = DisplayData.from(window); - assertThat(data, not(hasDisplayItem(hasKey("trigger")))); - assertThat(data, not(hasDisplayItem(hasKey("allowedLateness")))); + assertThat(data, not(hasDisplayItem("trigger"))); + assertThat(data, not(hasDisplayItem("allowedLateness"))); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 403ad9d44a2d..357ab440735c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -20,10 +20,8 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; -import static org.apache.beam.sdk.testing.SourceTestUtils - .assertSplitAtFractionSucceedsAndConsistent; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; @@ -428,7 +426,7 @@ public void testReadingDisplayData() { assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString())); // BigtableIO adds user-agent to options; assert only on key and not value. - assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions"))); + assertThat(displayData, hasDisplayItem("bigtableOptions")); } /** Tests that a record gets written to the service and messages are logged. */ @@ -494,10 +492,7 @@ public void testWritingDisplayData() { .withBigtableOptions(BIGTABLE_OPTIONS); DisplayData displayData = DisplayData.from(write); - assertThat(displayData, hasDisplayItem("tableId", "fooTable")); - // BigtableIO adds user-agent to options; assert only on key and not value. - assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions"))); } ////////////////////////////////////////////////////////////////////////////////////////////