diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 05879d9dd8f0..f58ceff22c0c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -29,6 +29,7 @@ import static org.apache.beam.sdk.util.Structs.getString; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator; @@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -89,6 +91,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -547,7 +551,7 @@ public void addStep(PTransform transform, String type) { currentStep.setKind(type); steps.add(currentStep); addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); + addDisplayData(stepName, transform); } @Override @@ -725,9 +729,21 @@ private void addOutput(String name, PValue value, Coder valueCoder) { outputInfoList.add(outputInfo); } - private void addDisplayData(String name, DisplayData displayData) { + private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { + DisplayData displayData; + try { + displayData = DisplayData.from(hasDisplayData); + } catch (Exception e) { + String msg = String.format("Exception thrown while collecting display data for step: %s. " + + "Display data will be not be available for this step.", stepName); + DisplayDataException displayDataException = new DisplayDataException(msg, e); + LOG.warn(msg, displayDataException); + + displayData = displayDataException.asDisplayData(); + } + List> list = MAPPER.convertValue(displayData, List.class); - addList(getProperties(), name, list); + addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } @Override @@ -1053,4 +1069,38 @@ private static void translateOutputs( context.addOutput(tag.getId(), output); } } + + /** + * Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner. + */ + static class DisplayDataException extends Exception implements HasDisplayData { + public DisplayDataException(String message, Throwable cause) { + super(checkNotNull(message), checkNotNull(cause)); + } + + /** + * Retrieve a display data representation of the exception, which can be submitted to + * the service in place of the actual display data. + */ + public DisplayData asDisplayData() { + return DisplayData.from(this); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Throwable cause = getCause(); + builder + .add(DisplayData.item("exceptionMessage", getMessage())) + .add(DisplayData.item("exceptionType", cause.getClass())) + .add(DisplayData.item("exceptionCause", cause.getMessage())) + .add(DisplayData.item("stackTrace", stackTraceToString())); + } + + private String stackTraceToString() { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + printStackTrace(printWriter); + return stringWriter.toString(); + } + } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8e7ed96899ec..ed7e67d49f8e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -21,8 +21,11 @@ import static org.apache.beam.sdk.util.Structs.getDictionary; import static org.apache.beam.sdk.util.Structs.getString; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -47,6 +50,7 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.RecordingPipelineVisitor; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -104,6 +108,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -720,7 +725,7 @@ public void testBadWildcardRecursive() throws Exception { pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); // Check that translation does fail. - thrown.expectCause(Matchers.allOf( + thrown.expectCause(allOf( instanceOf(IllegalArgumentException.class), ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); t.translate( @@ -966,4 +971,61 @@ public void populateDisplayData(DisplayData.Builder builder) { assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } + + @Test + public void testCapturesDisplayDataExceptions() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + Pipeline pipeline = Pipeline.create(options); + + final RuntimeException displayDataException = new RuntimeException("foobar"); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + throw displayDataException; + } + })); + + Job job = translator.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()).getJob(); + + String expectedMessage = "Display data will be not be available for this step"; + logs.verifyWarn(expectedMessage); + + List steps = job.getSteps(); + assertEquals("Job should have 2 steps", 2, steps.size()); + + @SuppressWarnings("unchecked") + Iterable> displayData = (Collection>) steps.get(1) + .getProperties().get("display_data"); + + String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName(); + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionType"), + hasEntry("value", RuntimeException.class.getName())))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionMessage"), + hasEntry(is("value"), Matchers.containsString(expectedMessage))))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionCause"), + hasEntry("value", "foobar")))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "stackTrace")))); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index fa8c0e9f4a84..dc6e381d0929 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -72,6 +72,10 @@ public static DisplayData none() { * Collect the {@link DisplayData} from a component. This will traverse all subcomponents * specified via {@link Builder#include} in the given component. Data in this component will be in * a namespace derived from the component. + * + *

Pipeline runners should call this method in order to collect display data. While it should + * be safe to call {@code DisplayData.from} on any component which implements it, runners should + * be resilient to exceptions thrown while collecting display data. */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java index 85920aa27279..622abb2e925d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java @@ -194,7 +194,7 @@ public void testSourceValidationSucceedsNamespace() throws Exception { } @Test - public void testSourceDipslayData() { + public void testSourceDisplayData() { DatastoreIO.Source source = DatastoreIO.source() .withDataset(DATASET) .withQuery(QUERY) @@ -242,7 +242,7 @@ public void testSinkValidationSucceedsWithDataset() throws Exception { } @Test - public void testSinkDipslayData() { + public void testSinkDisplayData() { DatastoreIO.Sink sink = DatastoreIO.sink() .withDataset(DATASET) .withHost(HOST); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 851769a03e60..21b2e3388a5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -39,7 +39,14 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayData.Item; import org.apache.beam.sdk.values.PCollection; @@ -62,11 +69,13 @@ import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.regex.Pattern; @@ -75,8 +84,8 @@ * Tests for {@link DisplayData} class. */ @RunWith(JUnit4.class) -public class DisplayDataTest { - @Rule public ExpectedException thrown = ExpectedException.none(); +public class DisplayDataTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime(); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -958,6 +967,38 @@ public void populateDisplayData(Builder builder) { quoted("DisplayDataTest"), "baz", "http://abc")); } + /** + * Validate that all runners are resilient to exceptions thrown while retrieving display data. + */ + @Test + @Category(RunnableOnService.class) + public void testRunnersResilientToDisplayDataExceptions() { + Pipeline p = TestPipeline.create(); + PCollection pCol = p + .apply(Create.of(1, 2, 3)) + .apply(new IdentityTransform() { + @Override + public void populateDisplayData(Builder builder) { + throw new RuntimeException("bug!"); + } + }); + + PAssert.that(pCol).containsInAnyOrder(1, 2, 3); + p.run(); + } + + private static class IdentityTransform extends PTransform, PCollection> { + @Override + public PCollection apply(PCollection input) { + return input.apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + })); + } + } + private String quoted(Object obj) { return String.format("\"%s\"", obj); }