From d6c5025937cbd0016d0a83d08e53902ae4d4519b Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 10 May 2016 11:19:14 -0700 Subject: [PATCH 1/3] Runners should be resilient to DisplayData failure Display data is collected from PTransforms at Pipeline construction time. Collecting display data runs user code from provided transforms and fn's. These components should be designed not to throw during pipeline construction, however we also shouldn't fail a pipeline if this code does fail. This PR adds resiliency to the DataflowPipelineTranslator, where we collect display data for the Dataflow runner, and also a RunnableOnService test to verify that all runners are resilient to display data failures. Other runners are not yet using display data, but will get this validation for free when they do. --- .../dataflow/DataflowPipelineTranslator.java | 17 +++++-- .../DataflowPipelineTranslatorTest.java | 31 +++++++++++++ .../sdk/transforms/display/DisplayData.java | 4 ++ .../transforms/display/DisplayDataTest.java | 45 ++++++++++++++++++- 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 05879d9dd8f0..89ed39ac66ab 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -547,7 +548,7 @@ public void addStep(PTransform transform, String type) { currentStep.setKind(type); steps.add(currentStep); addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); + addDisplayData(stepName, transform); } @Override @@ -725,9 +726,19 @@ private void addOutput(String name, PValue value, Coder valueCoder) { outputInfoList.add(outputInfo); } - private void addDisplayData(String name, DisplayData displayData) { + private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { + DisplayData displayData; + try { + displayData = DisplayData.from(hasDisplayData); + } catch (Exception e) { + String msg = String.format("Exception thrown while collecting display data for step: %s. " + + "Display data will be not be available for this step.", stepName); + LOG.warn(msg, e); + return; + } + List> list = MAPPER.convertValue(displayData, List.class); - addList(getProperties(), name, list); + addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } @Override diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8e7ed96899ec..02455f9a6bdb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.RecordingPipelineVisitor; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -104,6 +105,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -966,4 +968,33 @@ public void populateDisplayData(DisplayData.Builder builder) { assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } + + @Test + public void testResilientToDisplayDataException() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + Pipeline pipeline = Pipeline.create(options); + + final RuntimeException displayDataException = new RuntimeException("foobar"); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + throw displayDataException; + } + })); + + translator.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()); + + logs.verifyWarn("Display data will be not be available for this step", displayDataException); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index fa8c0e9f4a84..dc6e381d0929 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -72,6 +72,10 @@ public static DisplayData none() { * Collect the {@link DisplayData} from a component. This will traverse all subcomponents * specified via {@link Builder#include} in the given component. Data in this component will be in * a namespace derived from the component. + * + *

Pipeline runners should call this method in order to collect display data. While it should + * be safe to call {@code DisplayData.from} on any component which implements it, runners should + * be resilient to exceptions thrown while collecting display data. */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index 851769a03e60..a646150e8342 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -39,7 +39,14 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayData.Item; import org.apache.beam.sdk.values.PCollection; @@ -62,11 +69,13 @@ import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.regex.Pattern; @@ -75,8 +84,8 @@ * Tests for {@link DisplayData} class. */ @RunWith(JUnit4.class) -public class DisplayDataTest { - @Rule public ExpectedException thrown = ExpectedException.none(); +public class DisplayDataTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime(); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -958,6 +967,38 @@ public void populateDisplayData(Builder builder) { quoted("DisplayDataTest"), "baz", "http://abc")); } + /** + * Validate that all runners are resilient to exceptions thrown while retrieving display data. + */ + @Test + @Category(RunnableOnService.class) + public void testRunnersResilientToDispalyDataExceptions() { + Pipeline p = TestPipeline.create(); + PCollection pCol = p + .apply(Create.of(1, 2, 3)) + .apply(new IdentityTransform() { + @Override + public void populateDisplayData(Builder builder) { + throw new RuntimeException("bug!"); + } + }); + + PAssert.that(pCol).containsInAnyOrder(1, 2, 3); + p.run(); + } + + private static class IdentityTransform extends PTransform, PCollection> { + @Override + public PCollection apply(PCollection input) { + return input.apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + })); + } + } + private String quoted(Object obj) { return String.format("\"%s\"", obj); } From 72a1a6bff5852e42341c4d7383f71da188c96545 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 10 May 2016 14:59:13 -0700 Subject: [PATCH 2/3] Fix DisplayData typos --- .../src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java | 4 ++-- .../apache/beam/sdk/transforms/display/DisplayDataTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java index 85920aa27279..622abb2e925d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java @@ -194,7 +194,7 @@ public void testSourceValidationSucceedsNamespace() throws Exception { } @Test - public void testSourceDipslayData() { + public void testSourceDisplayData() { DatastoreIO.Source source = DatastoreIO.source() .withDataset(DATASET) .withQuery(QUERY) @@ -242,7 +242,7 @@ public void testSinkValidationSucceedsWithDataset() throws Exception { } @Test - public void testSinkDipslayData() { + public void testSinkDisplayData() { DatastoreIO.Sink sink = DatastoreIO.sink() .withDataset(DATASET) .withHost(HOST); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index a646150e8342..21b2e3388a5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -972,7 +972,7 @@ public void populateDisplayData(Builder builder) { */ @Test @Category(RunnableOnService.class) - public void testRunnersResilientToDispalyDataExceptions() { + public void testRunnersResilientToDisplayDataExceptions() { Pipeline p = TestPipeline.create(); PCollection pCol = p .apply(Create.of(1, 2, 3)) From 6928c0b67f0b1ee1c19f74ae87be1ee4ce2c4d8b Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 11 May 2016 09:18:53 -0700 Subject: [PATCH 3/3] When display data throws exception, use it for display --- .../dataflow/DataflowPipelineTranslator.java | 43 ++++++++++++++++++- .../DataflowPipelineTranslatorTest.java | 41 +++++++++++++++--- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 89ed39ac66ab..f58ceff22c0c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -29,6 +29,7 @@ import static org.apache.beam.sdk.util.Structs.getString; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator; @@ -90,6 +91,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -733,8 +736,10 @@ private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { } catch (Exception e) { String msg = String.format("Exception thrown while collecting display data for step: %s. " + "Display data will be not be available for this step.", stepName); - LOG.warn(msg, e); - return; + DisplayDataException displayDataException = new DisplayDataException(msg, e); + LOG.warn(msg, displayDataException); + + displayData = displayDataException.asDisplayData(); } List> list = MAPPER.convertValue(displayData, List.class); @@ -1064,4 +1069,38 @@ private static void translateOutputs( context.addOutput(tag.getId(), output); } } + + /** + * Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner. + */ + static class DisplayDataException extends Exception implements HasDisplayData { + public DisplayDataException(String message, Throwable cause) { + super(checkNotNull(message), checkNotNull(cause)); + } + + /** + * Retrieve a display data representation of the exception, which can be submitted to + * the service in place of the actual display data. + */ + public DisplayData asDisplayData() { + return DisplayData.from(this); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Throwable cause = getCause(); + builder + .add(DisplayData.item("exceptionMessage", getMessage())) + .add(DisplayData.item("exceptionType", cause.getClass())) + .add(DisplayData.item("exceptionCause", cause.getMessage())) + .add(DisplayData.item("stackTrace", stackTraceToString())); + } + + private String stackTraceToString() { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter); + printStackTrace(printWriter); + return stringWriter.toString(); + } + } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 02455f9a6bdb..ed7e67d49f8e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -21,8 +21,11 @@ import static org.apache.beam.sdk.util.Structs.getDictionary; import static org.apache.beam.sdk.util.Structs.getString; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -722,7 +725,7 @@ public void testBadWildcardRecursive() throws Exception { pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); // Check that translation does fail. - thrown.expectCause(Matchers.allOf( + thrown.expectCause(allOf( instanceOf(IllegalArgumentException.class), ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); t.translate( @@ -970,7 +973,7 @@ public void populateDisplayData(DisplayData.Builder builder) { } @Test - public void testResilientToDisplayDataException() throws IOException { + public void testCapturesDisplayDataExceptions() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); @@ -990,11 +993,39 @@ public void populateDisplayData(DisplayData.Builder builder) { } })); - translator.translate( + Job job = translator.translate( pipeline, (DataflowPipelineRunner) pipeline.getRunner(), - Collections.emptyList()); + Collections.emptyList()).getJob(); + + String expectedMessage = "Display data will be not be available for this step"; + logs.verifyWarn(expectedMessage); - logs.verifyWarn("Display data will be not be available for this step", displayDataException); + List steps = job.getSteps(); + assertEquals("Job should have 2 steps", 2, steps.size()); + + @SuppressWarnings("unchecked") + Iterable> displayData = (Collection>) steps.get(1) + .getProperties().get("display_data"); + + String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName(); + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionType"), + hasEntry("value", RuntimeException.class.getName())))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionMessage"), + hasEntry(is("value"), Matchers.containsString(expectedMessage))))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "exceptionCause"), + hasEntry("value", "foobar")))); + + Assert.assertThat(displayData, Matchers.>hasItem(allOf( + hasEntry("namespace", namespace), + hasEntry("key", "stackTrace")))); } }