Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
Expand All @@ -79,6 +80,7 @@
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -548,6 +550,7 @@ public void addStep(PTransform<?, ?> transform, String type) {
currentStep.setKind(type);
steps.add(currentStep);
addInput(PropertyNames.USER_NAME, getFullName(transform));
addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
}

@Override
Expand Down Expand Up @@ -725,6 +728,15 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
outputInfoList.add(outputInfo);
}

private void addDisplayData(String name, DisplayData displayData) {
List<Map<String, Object>> serializedItems = Lists.newArrayList();
for (DisplayData.Item item : displayData.items()) {
serializedItems.add(MAPPER.convertValue(item, Map.class));
}

addList(getProperties(), name, serializedItems);
}

@Override
public OutputReference asOutputReference(PValue value) {
AppliedPTransform<?, ?, ?> transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;

import org.apache.avro.reflect.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -214,10 +217,12 @@ private Item(
this.label = label;
}

@JsonGetter("namespace")
public String getNamespace() {
return ns;
}

@JsonGetter("key")
public String getKey() {
return key;
}
Expand All @@ -226,13 +231,15 @@ public String getKey() {
* Retrieve the {@link DisplayData.Type} of display metadata. All metadata conforms to a
* predefined set of allowed types.
*/
@JsonGetter("type")
public Type getType() {
return type;
}

/**
* Retrieve the value of the metadata item.
*/
@JsonGetter("value")
public String getValue() {
return value;
}
Expand All @@ -244,6 +251,8 @@ public String getValue() {
* <p>Some display data types will not provide a short value, in which case the return value
* will be null.
*/
@JsonGetter("shortValue")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getShortValue() {
return shortValue;
Expand All @@ -255,6 +264,8 @@ public String getShortValue() {
*
* <p>If no label was specified, this will return {@code null}.
*/
@JsonGetter("label")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getLabel() {
return label;
Expand All @@ -266,8 +277,10 @@ public String getLabel() {
*
* <p>If no URL was specified, this will return {@code null}.
*/
@JsonGetter("linkUrl")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public String getUrl() {
public String getLinkUrl() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth using an explicit @JsonGetter("link_url") on this and the other getters to ensure stability of the JSON names (and make it clear that these getters are being used by Jackson).

return url;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ public class PropertyNames {
public static final String VALIDATE_SINK = "validate_sink";
public static final String VALIDATE_SOURCE = "validate_source";
public static final String VALUE = "value";
public static final String DISPLAY_DATA = "display_data";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary;
import static com.google.cloud.dataflow.sdk.util.Structs.getString;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
Expand Down Expand Up @@ -53,6 +55,7 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.OutputReference;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
Expand Down Expand Up @@ -81,6 +84,8 @@
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -91,9 +96,9 @@
* Tests for DataflowPipelineTranslator.
*/
@RunWith(JUnit4.class)
public class DataflowPipelineTranslatorTest {
public class DataflowPipelineTranslatorTest implements Serializable {

@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public transient ExpectedException thrown = ExpectedException.none();

// A Custom Mockito matcher for an initial Job that checks that all
// expected fields are set.
Expand Down Expand Up @@ -496,7 +501,7 @@ private static Step createPredefinedStep() throws Exception {
return step;
}

private static class NoOpFn extends DoFn<String, String>{
private static class NoOpFn extends DoFn<String, String> {
@Override public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
Expand Down Expand Up @@ -796,4 +801,89 @@ public void testToIterableTranslationWithIsmSideInput() throws Exception {
Step collectionToSingletonStep = steps.get(2);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}

@Test
public void testStepDisplayData() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
DataflowPipeline pipeline = DataflowPipeline.create(options);

DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("foo", "bar")
.add("foo2", DataflowPipelineTranslatorTest.class)
.withLabel("Test Class")
.withLinkUrl("http://www.google.com");
}
};

DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("foo3", "barge");
}
};

pipeline
.apply(Create.of(1, 2, 3))
.apply(ParDo.of(fn1))
.apply(ParDo.of(fn2));

Job job = translator.translate(
pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob();

List<Step> steps = job.getSteps();
assertEquals(3, steps.size());

Map<String, Object> parDo1Properties = steps.get(1).getProperties();
Map<String, Object> parDo2Properties = steps.get(2).getProperties();
assertThat(parDo1Properties, hasKey("display_data"));

Collection<Map<String, String>> fn1displayData =
(Collection<Map<String, String>>) parDo1Properties.get("display_data");
Collection<Map<String, String>> fn2displayData =
(Collection<Map<String, String>>) parDo2Properties.get("display_data");

ImmutableList expectedFn1DisplayData = ImmutableList.of(
ImmutableMap.<String, String>builder()
.put("namespace", fn1.getClass().getName())
.put("key", "foo")
.put("type", "STRING")
.put("value", "bar")
.build(),
ImmutableMap.<String, String>builder()
.put("namespace", fn1.getClass().getName())
.put("key", "foo2")
.put("type", "JAVA_CLASS")
.put("value", DataflowPipelineTranslatorTest.class.getName())
.put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
.put("label", "Test Class")
.put("linkUrl", "http://www.google.com")
.build()
);

ImmutableList expectedFn2DisplayData = ImmutableList.of(
ImmutableMap.<String, String>builder()
.put("namespace", fn2.getClass().getName())
.put("key", "foo3")
.put("type", "STRING")
.put("value", "barge")
.build()
);

assertEquals(expectedFn1DisplayData, fn1displayData);
assertEquals(expectedFn2DisplayData, fn2displayData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ private static Matcher<DisplayData.Item> hasUrl(Matcher<String> urlMatcher) {
urlMatcher, "display item with url", "URL") {
@Override
protected String featureValueOf(DisplayData.Item actual) {
return actual.getUrl();
return actual.getLinkUrl();
}
};
}
Expand Down