From dbe8fda519ada5da4c09a306ab9a49e15f66759c Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 09:28:21 -0700 Subject: [PATCH 1/8] Test utility for display data in a pipeline runner DisplayDataEvaluator is useful for validating how PTransform display data is surfaced in the context of a Pipeline and runner. --- .../DataflowDisplayDataEvaluator.java | 71 +++++++++ .../display/DisplayDataEvaluator.java | 146 ++++++++++++++++++ .../display/DisplayDataEvaluatorTest.java | 74 +++++++++ 3 files changed, 291 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java new file mode 100644 index 000000000000..db42c771052b --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.transforms; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.util.NoopCredentialFactory; +import org.apache.beam.sdk.util.NoopPathValidator; + +import com.google.common.collect.Lists; + +/** + * Factory methods for creating {@link DisplayDataEvaluator} instances against the + * {@link DataflowPipelineRunner}. + */ +public final class DataflowDisplayDataEvaluator { + /** Do not instantiate. */ + private DataflowDisplayDataEvaluator() {} + + /** + * Retrieve a set of default {@link DataflowPipelineOptions} which can be used to build + * dataflow pipelines for evaluating display data. + */ + public static DataflowPipelineOptions getDefaultOptions() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + + options.setProject("foobar"); + options.setTempLocation("gs://bucket/tmpLocation"); + options.setFilesToStage(Lists.newArrayList()); + + options.as(DataflowPipelineDebugOptions.class).setPathValidatorClass(NoopPathValidator.class); + options.as(GcpOptions.class).setCredentialFactoryClass(NoopCredentialFactory.class); + + return options; + } + + /** + * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against + * the {@link DataflowPipelineRunner}. + */ + public static DisplayDataEvaluator create() { + return create(getDefaultOptions()); + } + + /** + * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against + * the {@link DataflowPipelineRunner} with the specified {@code options}. + */ + public static DisplayDataEvaluator create(DataflowPipelineOptions options) { + return DisplayDataEvaluator.forRunner(DataflowPipelineRunner.class, options); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java new file mode 100644 index 000000000000..67590039b916 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.display; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; + +import com.google.common.collect.Sets; + +import java.util.Objects; +import java.util.Set; + +/** + * Test utilities to evaluate the {@link DisplayData} in the context of a {@link PipelineRunner}. + */ +public class DisplayDataEvaluator { + private final PipelineOptions options; + + /** + * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and + * default {@link PipelineOptions}. + */ + public static DisplayDataEvaluator forRunner(Class> pipelineRunner) { + return forRunner(pipelineRunner, PipelineOptionsFactory.create()); + } + + /** + * Create a new {@link DisplayDataEvaluator} using the specified {@link PipelineRunner} and + * {@link PipelineOptions}. + */ + public static DisplayDataEvaluator forRunner( + Class> pipelineRunner, PipelineOptions pipelineOptions) { + return new DisplayDataEvaluator(pipelineRunner, pipelineOptions); + } + + private DisplayDataEvaluator(Class> runner, PipelineOptions options) { + this.options = options; + this.options.setRunner(runner); + } + + /** + * Traverse the specified {@link PTransform}, collecting {@link DisplayData} registered on the + * inner primitive {@link PTransform PTransforms}. + * + * @return the set of {@link DisplayData} for primitive {@link PTransform PTransforms}. + */ + public Set displayDataForPrimitiveTransforms( + final PTransform, ? extends POutput> root) { + return displayDataForPrimitiveTransforms(root, null); + } + + /** + * Traverse the specified {@link PTransform}, collecting {@link DisplayData} registered on the + * inner primitive {@link PTransform PTransforms}. + * + * @param root The root {@link PTransform} to traverse + * @param inputCoder The coder to set for the {@link PTransform} input, or null to infer the + * default coder. + * + * @return the set of {@link DisplayData} for primitive {@link PTransform PTransforms}. + */ + public Set displayDataForPrimitiveTransforms( + final PTransform, ? extends POutput> root, + Coder inputCoder) { + + Create.Values input = Create.of(); + if (inputCoder != null) { + input = input.withCoder(inputCoder); + } + + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply(input) + .apply(root); + + PrimitiveDisplayDataPTransformVisitor visitor = new PrimitiveDisplayDataPTransformVisitor(root); + pipeline.traverseTopologically(visitor); + return visitor.getPrimitivesDisplayData(); + } + + /** + * Visits {@link PTransform PTransforms} in a pipeline, and collects {@link DisplayData} for + * each primitive transform under a given composite root. + */ + private static class PrimitiveDisplayDataPTransformVisitor + extends Pipeline.PipelineVisitor.Defaults { + private final PTransform root; + private final Set displayData; + private boolean shouldRecord = false; + + PrimitiveDisplayDataPTransformVisitor(PTransform root) { + this.root = root; + this.displayData = Sets.newHashSet(); + } + + Set getPrimitivesDisplayData() { + return displayData; + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + if (Objects.equals(root, node.getTransform())) { + shouldRecord = true; + } + + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + if (Objects.equals(root, node.getTransform())) { + shouldRecord = false; + } + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + if (shouldRecord) { + displayData.add(DisplayData.from(node.getTransform())); + } + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java new file mode 100644 index 000000000000..f24c1337a8e9 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.display; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; + +import org.junit.Test; + +import java.io.Serializable; +import java.util.Set; + +/** + * Unit tests for {@link DisplayDataEvaluator}. + */ +public class DisplayDataEvaluatorTest implements Serializable { + + @Test + public void testDisplayDataForPrimitiveTransforms() { + PTransform, ? super POutput> myTransform = + new PTransform, POutput> () { + @Override + public PCollection apply(PCollection input) { + return input.apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("primitiveKey", "primitiveValue")); + } + })); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("compositeKey", "compositeValue")); + } + }; + + DisplayDataEvaluator evaluator = DisplayDataEvaluator.forRunner(DirectPipelineRunner.class); + Set displayData = evaluator.displayDataForPrimitiveTransforms(myTransform); + + assertThat(displayData, not(hasItem(hasDisplayItem("compositeKey", "compositeValue")))); + assertThat(displayData, hasItem(hasDisplayItem("primitiveKey", "primitiveValue"))); + } +} From bb688b1dccb4f06ed251efcd971063aa79660132 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 09:33:55 -0700 Subject: [PATCH 2/8] Fix Combine transform primitive display data --- .../transforms/DataflowCombineTest.java | 58 +++++++++++++++++++ .../apache/beam/sdk/transforms/Combine.java | 2 +- 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java new file mode 100644 index 000000000000..3af0caec9528 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.transforms; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineTest; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; + +import org.junit.Test; + +import java.util.Set; + +/** + * Unit tests for Dataflow usage of {@link Combine} transforms. + */ +public class DataflowCombineTest { + @Test + public void testCombinePerKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform>, ? extends POutput> combine = + Combine.perKey(combineFn); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey should include the combineFn in its primitive transform", + displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 8a39c98a16e7..ffbaafa2533a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1781,7 +1781,7 @@ public List> getSideInputs() { public PCollection> apply(PCollection> input) { return input .apply(GroupByKey.create(fewKeys)) - .apply(Combine.groupedValues(fn).withSideInputs(sideInputs)); + .apply(Combine.groupedValues(fn, fnDisplayData).withSideInputs(sideInputs)); } @Override From 7010cd951ff67be5422b6b24b5627456394d7879 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 11:16:51 -0700 Subject: [PATCH 3/8] Add display data for MapElements transform --- .../transforms/DataflowMapElementsTest.java | 55 +++++++++++++++++++ .../beam/sdk/transforms/MapElements.java | 12 ++++ .../beam/sdk/transforms/MapElementsTest.java | 36 +++++++++++- 3 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java new file mode 100644 index 000000000000..8a5e67d5cad5 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.transforms; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import org.junit.Test; + +import java.io.Serializable; +import java.util.Set; + +/** + * Unit tests for Dataflow usage of {@link MapElements} transforms. + */ +public class DataflowMapElementsTest implements Serializable { + @Test + public void testPrimitiveDisplayData() { + SimpleFunction mapFn = new SimpleFunction() { + @Override + public Integer apply(Integer input) { + return input; + } + }; + + MapElements map = MapElements.via(mapFn); + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(map); + assertThat("MapElements should include the mapFn in its primitive display data", + displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 47c2f5d495d5..29d1dde8e2fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -108,6 +109,17 @@ public PCollection apply(PCollection input) { public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + MapElements.this.populateDisplayData(builder); + } })).setTypeDescriptorInternal(outputType); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("mapFn", fn.getClass())); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index 54465129e27c..1e2c826eca8b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -17,12 +17,15 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -103,7 +106,7 @@ public String apply(String input) { assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()), equalTo(pipeline.getCoderRegistry().getDefaultCoder(new TypeDescriptor() {}))); - // Make sure the pipelien runs too + // Make sure the pipeline runs too pipeline.run(); } @@ -118,12 +121,41 @@ public void testVoidValues() throws Exception { pipeline.run(); } + @Test + public void testSerializableFunctionDisplayData() { + SerializableFunction serializableFn = + new SerializableFunction() { + @Override + public Integer apply(Integer input) { + return input; + } + }; + + MapElements serializableMap = MapElements.via(serializableFn) + .withOutputType(TypeDescriptor.of(Integer.class)); + assertThat(DisplayData.from(serializableMap), + hasDisplayItem("mapFn", serializableFn.getClass())); + } + + @Test + public void testSimpleFunctionDisplayData() { + SimpleFunction simpleFn = new SimpleFunction() { + @Override + public Integer apply(Integer input) { + return input; + } + }; + + MapElements simpleMap = MapElements.via(simpleFn); + assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass())); + } + static class VoidValues extends PTransform>, PCollection>> { @Override public PCollection> apply(PCollection> input) { - return input.apply(MapElements., KV>via( + return input.apply(MapElements.via( new SimpleFunction, KV>() { @Override public KV apply(KV input) { From 1882691db1f7a618b04d2d41788f895a7a8e5534 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 14:10:59 -0700 Subject: [PATCH 4/8] Exclude JsonIgnore PipelineOptions from DisplayData --- .../beam/sdk/options/PipelineOptionSpec.java | 61 +++++-------------- .../sdk/options/ProxyInvocationHandler.java | 10 +++ .../options/PipelineOptionsReflectorTest.java | 32 ++++++++++ .../options/ProxyInvocationHandlerTest.java | 9 +++ 4 files changed, 65 insertions(+), 47 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java index 71f9d462bbf7..9a88f7076232 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionSpec.java @@ -17,73 +17,40 @@ */ package org.apache.beam.sdk.options; -import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; +import com.google.auto.value.AutoValue; + +import com.fasterxml.jackson.annotation.JsonIgnore; import java.lang.reflect.Method; /** * For internal use. Specification for an option defined in a {@link PipelineOptions} interface. */ -class PipelineOptionSpec { - private final Class clazz; - private final String name; - private final Method getter; - +@AutoValue +abstract class PipelineOptionSpec { static PipelineOptionSpec of(Class clazz, String name, Method getter) { - return new PipelineOptionSpec(clazz, name, getter); - } - - private PipelineOptionSpec(Class clazz, String name, Method getter) { - this.clazz = clazz; - this.name = name; - this.getter = getter; + return new AutoValue_PipelineOptionSpec(clazz, name, getter); } /** * The {@link PipelineOptions} interface which defines this {@link PipelineOptionSpec}. */ - Class getDefiningInterface() { - return clazz; - } + abstract Class getDefiningInterface(); /** * Name of the property. */ - String getName() { - return name; - } + abstract String getName(); /** * The getter method for this property. */ - Method getGetterMethod() { - return getter; - } + abstract Method getGetterMethod(); - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("definingInterface", getDefiningInterface()) - .add("name", getName()) - .add("getterMethod", getGetterMethod()) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hashCode(getDefiningInterface(), getName(), getGetterMethod()); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PipelineOptionSpec)) { - return false; - } - - PipelineOptionSpec that = (PipelineOptionSpec) obj; - return Objects.equal(this.getDefiningInterface(), that.getDefiningInterface()) - && Objects.equal(this.getName(), that.getName()) - && Objects.equal(this.getGetterMethod(), that.getGetterMethod()); + /** + * Whether the option should be serialized. Uses the {@link JsonIgnore} annotation. + */ + boolean shouldSerialize() { + return !getGetterMethod().isAnnotationPresent(JsonIgnore.class); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 745549f4cd60..159eb5bb6c89 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -281,6 +281,12 @@ private void populateDisplayData(DisplayData.Builder builder) { HashSet specs = new HashSet<>(optionsMap.get(option.getKey())); for (PipelineOptionSpec optionSpec : specs) { + if (!optionSpec.shouldSerialize()) { + // Options that are excluded for serialization (i.e. those with @JsonIgnore) are also + // excluded from display data. These options are generally not useful for display. + continue; + } + Class pipelineInterface = optionSpec.getDefiningInterface(); if (type != null) { builder.add(DisplayData.item(option.getKey(), type, value) @@ -304,6 +310,10 @@ private void populateDisplayData(DisplayData.Builder builder) { .withNamespace(UnknownPipelineOptions.class)); } else { for (PipelineOptionSpec spec : specs) { + if (!spec.shouldSerialize()) { + continue; + } + Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod()); DisplayData.Type type = DisplayData.inferType(value); if (type != null) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java index 82f032963a7d..8f801c79688a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsReflectorTest.java @@ -20,12 +20,15 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import com.google.common.collect.ImmutableSet; + +import com.fasterxml.jackson.annotation.JsonIgnore; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -129,6 +132,24 @@ interface HiddenOptions extends PipelineOptions { void setFoo(String value); } + @Test + public void testShouldSerialize() { + Set properties = + PipelineOptionsReflector.getOptionSpecs(JsonIgnoreOptions.class); + + assertThat(properties, hasItem(allOf(hasName("notIgnored"), shouldSerialize()))); + assertThat(properties, hasItem(allOf(hasName("ignored"), not(shouldSerialize())))); + } + + interface JsonIgnoreOptions extends PipelineOptions { + String getNotIgnored(); + void setNotIgnored(String value); + + @JsonIgnore + String getIgnored(); + void setIgnored(String value); + } + @Test public void testMultipleInputInterfaces() { Set> interfaces = @@ -193,4 +214,15 @@ protected String featureValueOf(PipelineOptionSpec actual) { } }; } + + private static Matcher shouldSerialize() { + return new FeatureMatcher(equalTo(true), + "should serialize", "shouldSerialize") { + + @Override + protected Boolean featureValueOf(PipelineOptionSpec actual) { + return actual.shouldSerialize(); + } + }; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index b53000dba353..228a6ba092a1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -951,4 +951,13 @@ private T serializeDeserialize(Class kls, Pipelin String value = MAPPER.writeValueAsString(options); return MAPPER.readValue(value, PipelineOptions.class).as(kls); } + + @Test + public void testDisplayDataExcludesJsonIgnoreOptions() { + IgnoredProperty options = PipelineOptionsFactory.as(IgnoredProperty.class); + options.setValue("foobar"); + + DisplayData data = DisplayData.from(options); + assertThat(data, not(hasDisplayItem(hasKey("value")))); + } } From c8588f8839994708f07dddd751884ad190775d4b Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 3 May 2016 14:13:00 -0700 Subject: [PATCH 5/8] Exclude OffsetBasedSource minBundleSize default --- .../src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 126535a5b52d..1c033f8b4eaa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -207,7 +207,7 @@ public boolean allowsDynamicSplitting() { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder - .add(DisplayData.item("minBundleSize", minBundleSize)) + .addIfNotDefault(DisplayData.item("minBundleSize", minBundleSize), 1L) .addIfNotDefault(DisplayData.item("startOffset", startOffset), 0L) .addIfNotDefault(DisplayData.item("endOffset", endOffset), Long.MAX_VALUE); } From ba95c0224b25b3317f3aefa3104d00e4a444aadb Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 6 May 2016 11:40:47 -0700 Subject: [PATCH 6/8] Fix Write transform primitive display data --- .../dataflow/io/DataflowTextIOTest.java | 23 ++++++++++++++++++- .../java/org/apache/beam/sdk/io/Write.java | 5 ++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java index bfc99e8f06cc..8ff7d0e5679b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java @@ -17,11 +17,21 @@ */ package org.apache.beam.runners.dataflow.io; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertThat; + import org.apache.beam.runners.dataflow.DataflowPipelineRunner; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.TestCredential; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -41,13 +51,13 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.Set; /** * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms. */ @RunWith(JUnit4.class) public class DataflowTextIOTest { - private TestDataflowPipelineOptions buildTestPipelineOptions() { TestDataflowPipelineOptions options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); @@ -116,4 +126,15 @@ public void testGoodWildcards() throws Exception { private void applyRead(Pipeline pipeline, String path) { pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); } + + @Test + public void testPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); + + TextIO.Write.Bound write = TextIO.Write.to("foobar"); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("TextIO.Write should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index 9cb026a9516c..0f2dbf87174c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -204,6 +204,11 @@ public void finishBundle(Context c) throws Exception { c.output(result); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Write.Bound.this.populateDisplayData(builder); + } }).withSideInputs(writeOperationView)) .setCoder(writeOperation.getWriterResultCoder()); From 90affe6416b92774d7f2227e5f8e062581760ca0 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 6 May 2016 16:03:12 -0700 Subject: [PATCH 7/8] Exclude default Window.allowedLateness display data --- .../org/apache/beam/sdk/transforms/windowing/Window.java | 9 +++++++-- .../apache/beam/sdk/transforms/windowing/WindowTest.java | 6 ++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index e9b34374740e..b9dd451fe2b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -601,8 +601,13 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder .add(DisplayData.item("windowFn", windowFn.getClass())) - .include(windowFn) - .addIfNotNull(DisplayData.item("allowedLateness", allowedLateness)); + .include(windowFn); + + if (allowedLateness != null) { + builder.addIfNotDefault(DisplayData.item("allowedLateness", allowedLateness), + Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + + } if (trigger != null && !(trigger instanceof DefaultTrigger)) { builder.add(DisplayData.item("trigger", trigger.toString())); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 8ad590d21753..885f549d8d90 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -285,11 +285,13 @@ public void testDisplayDataExcludesUnspecifiedProperties() { } @Test - public void testDisplayDataExcludesDefaultTrigger() { + public void testDisplayDataExcludesDefaults() { Window.Bound window = Window.into(new GlobalWindows()) - .triggering(DefaultTrigger.of()); + .triggering(DefaultTrigger.of()) + .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); DisplayData data = DisplayData.from(window); assertThat(data, not(hasDisplayItem(hasKey("trigger")))); + assertThat(data, not(hasDisplayItem(hasKey("allowedLateness")))); } } From 3ea62f83a3f4cd872df805b0e6ff18e72fe691be Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Fri, 6 May 2016 16:46:40 -0700 Subject: [PATCH 8/8] Fix BigQuery sink display data --- .../dataflow/io/DataflowBigQueryIOTest.java | 71 +++++++++++++++++++ .../org/apache/beam/sdk/io/BigQueryIO.java | 26 +++++++ 2 files changed, 97 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java new file mode 100644 index 000000000000..619da047d6d9 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; +import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; + +import com.google.api.services.bigquery.model.TableSchema; + +import org.junit.Test; + +import java.util.Set; + +/** + * Unit tests for Dataflow usage of {@link BigQueryIO} transforms. + */ +public class DataflowBigQueryIOTest { + @Test + public void testBatchSinkPrimitiveDisplayData() { + DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); + options.setStreaming(false); + testSinkPrimitiveDisplayData(options); + } + + @Test + public void testStreamingSinkPrimitiveDisplayData() { + DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); + options.setStreaming(true); + testSinkPrimitiveDisplayData(options); + } + + private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) { + DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(options); + + BigQueryIO.Write.Bound write = BigQueryIO.Write + .to("project:dataset.table") + .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) + .withoutValidation(); + + Set displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("BigQueryIO.Write should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem(hasKey("tableSpec")))); + + assertThat("BigQueryIO.Write should include the table schema in its primitive display data", + displayData, hasItem(hasDisplayItem(hasKey("schema")))); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 77852989dd97..15872e50f5f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -1166,6 +1166,15 @@ public FileBasedSink.FileBasedWriteOperation createWriteOperation( return new BigQueryWriteOperation(this); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull(DisplayData.item("schema", jsonSchema)) + .addIfNotNull(DisplayData.item("tableSpec", jsonTable)); + } + private static class BigQueryWriteOperation extends FileBasedWriteOperation { // The maximum number of retry load jobs. private static final int MAX_RETRY_LOAD_JOBS = 3; @@ -1396,6 +1405,13 @@ public void finishBundle(Context context) throws Exception { uniqueIdsForTableRows.clear(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)); + } + public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) throws IOException { TableReference tableReference = parseTableSpec(tableSpec); @@ -1603,6 +1619,16 @@ public void processElement(ProcessContext context) throws IOException { new TableRowInfo(context.element(), uniqueId))); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec)); + if (tableRefFunction != null) { + builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())); + } + } + private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) { if (tableSpec != null) { return tableSpec;