diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 0feae957f86b..155c454b38e3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -59,6 +59,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; @@ -79,6 +80,7 @@ import com.google.cloud.dataflow.sdk.values.TypedPValue; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -548,6 +550,7 @@ public void addStep(PTransform transform, String type) { currentStep.setKind(type); steps.add(currentStep); addInput(PropertyNames.USER_NAME, getFullName(transform)); + addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); } @Override @@ -725,6 +728,15 @@ private void addOutput(String name, PValue value, Coder valueCoder) { outputInfoList.add(outputInfo); } + private void addDisplayData(String name, DisplayData displayData) { + List> serializedItems = Lists.newArrayList(); + for (DisplayData.Item item : displayData.items()) { + serializedItems.add(MAPPER.convertValue(item, Map.class)); + } + + addList(getProperties(), name, serializedItems); + } + @Override public OutputReference asOutputReference(PValue value) { AppliedPTransform transform = diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java index 05fa7c78816d..dadc7309da31 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java @@ -26,6 +26,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonInclude; + import org.apache.avro.reflect.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -214,10 +217,12 @@ private Item( this.label = label; } + @JsonGetter("namespace") public String getNamespace() { return ns; } + @JsonGetter("key") public String getKey() { return key; } @@ -226,6 +231,7 @@ public String getKey() { * Retrieve the {@link DisplayData.Type} of display metadata. All metadata conforms to a * predefined set of allowed types. */ + @JsonGetter("type") public Type getType() { return type; } @@ -233,6 +239,7 @@ public Type getType() { /** * Retrieve the value of the metadata item. */ + @JsonGetter("value") public String getValue() { return value; } @@ -244,6 +251,8 @@ public String getValue() { *

Some display data types will not provide a short value, in which case the return value * will be null. */ + @JsonGetter("shortValue") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable public String getShortValue() { return shortValue; @@ -255,6 +264,8 @@ public String getShortValue() { * *

If no label was specified, this will return {@code null}. */ + @JsonGetter("label") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable public String getLabel() { return label; @@ -266,8 +277,10 @@ public String getLabel() { * *

If no URL was specified, this will return {@code null}. */ + @JsonGetter("linkUrl") + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - public String getUrl() { + public String getLinkUrl() { return url; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java index ec6518976b0e..81572ea2070e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java @@ -103,4 +103,5 @@ public class PropertyNames { public static final String VALIDATE_SINK = "validate_sink"; public static final String VALIDATE_SOURCE = "validate_source"; public static final String VALUE = "value"; + public static final String DISPLAY_DATA = "display_data"; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 497552f90124..65f8dde26664 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -20,9 +20,11 @@ import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary; import static com.google.cloud.dataflow.sdk.util.Structs.getString; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,6 +55,7 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.GcsUtil; import com.google.cloud.dataflow.sdk.util.OutputReference; import com.google.cloud.dataflow.sdk.util.PropertyNames; @@ -81,6 +84,8 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -91,9 +96,9 @@ * Tests for DataflowPipelineTranslator. */ @RunWith(JUnit4.class) -public class DataflowPipelineTranslatorTest { +public class DataflowPipelineTranslatorTest implements Serializable { - @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -496,7 +501,7 @@ private static Step createPredefinedStep() throws Exception { return step; } - private static class NoOpFn extends DoFn{ + private static class NoOpFn extends DoFn { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element()); } @@ -796,4 +801,89 @@ public void testToIterableTranslationWithIsmSideInput() throws Exception { Step collectionToSingletonStep = steps.get(2); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } + + @Test + public void testStepDisplayData() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + DataflowPipeline pipeline = DataflowPipeline.create(options); + + DoFn fn1 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("foo", "bar") + .add("foo2", DataflowPipelineTranslatorTest.class) + .withLabel("Test Class") + .withLinkUrl("http://www.google.com"); + } + }; + + DoFn fn2 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo3", "barge"); + } + }; + + pipeline + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(fn1)) + .apply(ParDo.of(fn2)); + + Job job = translator.translate( + pipeline, pipeline.getRunner(), Collections.emptyList()).getJob(); + + List steps = job.getSteps(); + assertEquals(3, steps.size()); + + Map parDo1Properties = steps.get(1).getProperties(); + Map parDo2Properties = steps.get(2).getProperties(); + assertThat(parDo1Properties, hasKey("display_data")); + + Collection> fn1displayData = + (Collection>) parDo1Properties.get("display_data"); + Collection> fn2displayData = + (Collection>) parDo2Properties.get("display_data"); + + ImmutableList expectedFn1DisplayData = ImmutableList.of( + ImmutableMap.builder() + .put("namespace", fn1.getClass().getName()) + .put("key", "foo") + .put("type", "STRING") + .put("value", "bar") + .build(), + ImmutableMap.builder() + .put("namespace", fn1.getClass().getName()) + .put("key", "foo2") + .put("type", "JAVA_CLASS") + .put("value", DataflowPipelineTranslatorTest.class.getName()) + .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) + .put("label", "Test Class") + .put("linkUrl", "http://www.google.com") + .build() + ); + + ImmutableList expectedFn2DisplayData = ImmutableList.of( + ImmutableMap.builder() + .put("namespace", fn2.getClass().getName()) + .put("key", "foo3") + .put("type", "STRING") + .put("value", "barge") + .build() + ); + + assertEquals(expectedFn1DisplayData, fn1displayData); + assertEquals(expectedFn2DisplayData, fn2displayData); + } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java index 13dd6186571a..dfc8c38f52fb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java @@ -606,7 +606,7 @@ private static Matcher hasUrl(Matcher urlMatcher) { urlMatcher, "display item with url", "URL") { @Override protected String featureValueOf(DisplayData.Item actual) { - return actual.getUrl(); + return actual.getLinkUrl(); } }; }