Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.apache.beam.sdk.util.Structs.getString;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator;
Expand All @@ -55,6 +56,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
Expand Down Expand Up @@ -89,6 +91,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -547,7 +551,7 @@ public void addStep(PTransform<?, ?> transform, String type) {
currentStep.setKind(type);
steps.add(currentStep);
addInput(PropertyNames.USER_NAME, getFullName(transform));
addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
addDisplayData(stepName, transform);
}

@Override
Expand Down Expand Up @@ -725,9 +729,21 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
outputInfoList.add(outputInfo);
}

private void addDisplayData(String name, DisplayData displayData) {
private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
DisplayData displayData;
try {
displayData = DisplayData.from(hasDisplayData);
} catch (Exception e) {
String msg = String.format("Exception thrown while collecting display data for step: %s. "
+ "Display data will be not be available for this step.", stepName);
DisplayDataException displayDataException = new DisplayDataException(msg, e);
LOG.warn(msg, displayDataException);

displayData = displayDataException.asDisplayData();
}

List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
addList(getProperties(), name, list);
addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
}

@Override
Expand Down Expand Up @@ -1053,4 +1069,38 @@ private static void translateOutputs(
context.addOutput(tag.getId(), output);
}
}

/**
* Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner.
*/
static class DisplayDataException extends Exception implements HasDisplayData {
public DisplayDataException(String message, Throwable cause) {
super(checkNotNull(message), checkNotNull(cause));
}

/**
* Retrieve a display data representation of the exception, which can be submitted to
* the service in place of the actual display data.
*/
public DisplayData asDisplayData() {
return DisplayData.from(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Throwable cause = getCause();
builder
.add(DisplayData.item("exceptionMessage", getMessage()))
.add(DisplayData.item("exceptionType", cause.getClass()))
.add(DisplayData.item("exceptionCause", cause.getMessage()))
.add(DisplayData.item("stackTrace", stackTraceToString()));
}

private String stackTraceToString() {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
printStackTrace(printWriter);
return stringWriter.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import static org.apache.beam.sdk.util.Structs.getDictionary;
import static org.apache.beam.sdk.util.Structs.getString;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand All @@ -47,6 +50,7 @@
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.RecordingPipelineVisitor;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -104,6 +108,7 @@
public class DataflowPipelineTranslatorTest implements Serializable {

@Rule public transient ExpectedException thrown = ExpectedException.none();
@Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class);

// A Custom Mockito matcher for an initial Job that checks that all
// expected fields are set.
Expand Down Expand Up @@ -720,7 +725,7 @@ public void testBadWildcardRecursive() throws Exception {
pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));

// Check that translation does fail.
thrown.expectCause(Matchers.allOf(
thrown.expectCause(allOf(
instanceOf(IllegalArgumentException.class),
ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
t.translate(
Expand Down Expand Up @@ -966,4 +971,61 @@ public void populateDisplayData(DisplayData.Builder builder) {
assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
}

@Test
public void testCapturesDisplayDataExceptions() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);

final RuntimeException displayDataException = new RuntimeException("foobar");
pipeline
.apply(Create.of(1, 2, 3))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
throw displayDataException;
}
}));

Job job = translator.translate(
pipeline,
(DataflowPipelineRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList()).getJob();

String expectedMessage = "Display data will be not be available for this step";
logs.verifyWarn(expectedMessage);

List<Step> steps = job.getSteps();
assertEquals("Job should have 2 steps", 2, steps.size());

@SuppressWarnings("unchecked")
Iterable<Map<String, String>> displayData = (Collection<Map<String, String>>) steps.get(1)
.getProperties().get("display_data");

String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName();
Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionType"),
hasEntry("value", RuntimeException.class.getName()))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionMessage"),
hasEntry(is("value"), Matchers.containsString(expectedMessage)))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionCause"),
hasEntry("value", "foobar"))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "stackTrace"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public static DisplayData none() {
* Collect the {@link DisplayData} from a component. This will traverse all subcomponents
* specified via {@link Builder#include} in the given component. Data in this component will be in
* a namespace derived from the component.
*
* <p>Pipeline runners should call this method in order to collect display data. While it should
* be safe to call {@code DisplayData.from} on any component which implements it, runners should
* be resilient to exceptions thrown while collecting display data.
*/
public static DisplayData from(HasDisplayData component) {
checkNotNull(component, "component argument cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void testSourceValidationSucceedsNamespace() throws Exception {
}

@Test
public void testSourceDipslayData() {
public void testSourceDisplayData() {
DatastoreIO.Source source = DatastoreIO.source()
.withDataset(DATASET)
.withQuery(QUERY)
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testSinkValidationSucceedsWithDataset() throws Exception {
}

@Test
public void testSinkDipslayData() {
public void testSinkDisplayData() {
DatastoreIO.Sink sink = DatastoreIO.sink()
.withDataset(DATASET)
.withHost(HOST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.DisplayData.Item;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -62,11 +69,13 @@
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.regex.Pattern;
Expand All @@ -75,8 +84,8 @@
* Tests for {@link DisplayData} class.
*/
@RunWith(JUnit4.class)
public class DisplayDataTest {
@Rule public ExpectedException thrown = ExpectedException.none();
public class DisplayDataTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime();
private static final ObjectMapper MAPPER = new ObjectMapper();

Expand Down Expand Up @@ -958,6 +967,38 @@ public void populateDisplayData(Builder builder) {
quoted("DisplayDataTest"), "baz", "http://abc"));
}

/**
* Validate that all runners are resilient to exceptions thrown while retrieving display data.
*/
@Test
@Category(RunnableOnService.class)
public void testRunnersResilientToDisplayDataExceptions() {
Pipeline p = TestPipeline.create();
PCollection<Integer> pCol = p
.apply(Create.of(1, 2, 3))
.apply(new IdentityTransform<Integer>() {
@Override
public void populateDisplayData(Builder builder) {
throw new RuntimeException("bug!");
}
});

PAssert.that(pCol).containsInAnyOrder(1, 2, 3);
p.run();
}

private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
return input.apply(ParDo.of(new DoFn<T, T>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}
}));
}
}

private String quoted(Object obj) {
return String.format("\"%s\"", obj);
}
Expand Down