From 740242c330ce99916ed76af7c6e68638d1e3c0e3 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 16:48:15 -0700 Subject: [PATCH 1/2] Apply ModelEnforcement in the InProcessPipelineRunner This ensures that user code does not violate the model. Add a flag to control application of immutability enforcement. This flag is enabled by default. --- .../inprocess/InProcessPipelineOptions.java | 9 +++++++ .../inprocess/InProcessPipelineRunner.java | 25 ++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index ae5b49bb7799..d44ea78dbffa 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa boolean isBlockOnRun(); void setBlockOnRun(boolean b); + + @Default.Boolean(true) + @Description( + "Controls whether the runner should ensure that all of the elements of every " + + "PCollection are not mutated. PTransforms are not permitted to mutate input elements " + + "at any point, or output elements after they are output.") + boolean isTestImmutability(); + + void setTestImmutability(boolean test); } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 4fb01b7ce48c..f5b7f3c466a2 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; @@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.joda.time.Instant; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -269,11 +270,29 @@ public InProcessPipelineResult run(Pipeline pipeline) { private Map, Collection> defaultModelEnforcements(InProcessPipelineOptions options) { - return Collections.emptyMap(); + ImmutableMap.Builder, Collection> + enforcements = ImmutableMap.builder(); + Collection parDoEnforcements = createParDoEnforcements(options); + enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + return enforcements.build(); + } + + private Collection createParDoEnforcements( + InProcessPipelineOptions options) { + ImmutableList.Builder enforcements = ImmutableList.builder(); + if (options.isTestImmutability()) { + enforcements.add(ImmutabilityEnforcementFactory.create()); + } + return enforcements.build(); } private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { - return InProcessBundleFactory.create(); + BundleFactory bundleFactory = InProcessBundleFactory.create(); + if (pipelineOptions.isTestImmutability()) { + bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); + } + return bundleFactory; } /** From 150eac594a265a700771e07f07a54b802c5c4776 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 5 Apr 2016 13:19:24 -0700 Subject: [PATCH 2/2] Improve ImmutabilityEnforcement Check per-element, to catch failures within a call to ProcessElement more quickly. Move wrapping of exceptions over the course of calls to ProcessElement to ParDoInProcessEvaluator. --- .../ImmutabilityEnforcementFactory.java | 36 +++++++++++-------- .../inprocess/ParDoInProcessEvaluator.java | 13 +++++-- .../ImmutabilityEnforcementFactoryTest.java | 9 ++--- .../inprocess/TransformExecutorTest.java | 6 ++-- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java index 2e4c07b68452..8b7ccbaf5139 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java @@ -70,26 +70,34 @@ public void beforeElement(WindowedValue element) { } } + @Override + public void afterElement(WindowedValue element) { + verifyUnmodified(mutationElements.get(element)); + } + @Override public void afterFinish( CommittedBundle input, InProcessTransformResult result, Iterable> outputs) { for (MutationDetector detector : mutationElements.values()) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException e) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s illegaly mutated value %s of class %s." - + " Input values must not be mutated in any way.", - transform.getFullName(), - e.getSavedValue(), - e.getSavedValue().getClass()), - e.getSavedValue(), - e.getNewValue())); - } + verifyUnmodified(detector); + } + } + + private void verifyUnmodified(MutationDetector detector) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException e) { + throw new IllegalMutationException( + String.format( + "PTransform %s illegaly mutated value %s of class %s." + + " Input values must not be mutated in any way.", + transform.getFullName(), + e.getSavedValue(), + e.getSavedValue().getClass()), + e.getSavedValue(), + e.getNewValue()); } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 3942bff22ae8..4b4d69915fda 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager; +import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -56,12 +57,20 @@ public ParDoInProcessEvaluator( @Override public void processElement(WindowedValue element) { - fnRunner.processElement(element); + try { + fnRunner.processElement(element); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } } @Override public InProcessTransformResult finishBundle() { - fnRunner.finishBundle(); + try { + fnRunner.finishBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } StepTransformResult.Builder resultBuilder; CopyOnAccessInMemoryStateInternals state = stepContext.commitState(); if (state != null) { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java index e65b178181ea..ec779c0d766a 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java @@ -17,8 +17,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import static org.hamcrest.Matchers.isA; - import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -27,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.util.IllegalMutationException; -import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -94,8 +91,7 @@ public void mutatedDuringProcessElementThrows() { ModelEnforcement enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); element.getValue()[0] = 'f'; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); @@ -118,8 +114,7 @@ public void mutatedAfterProcessElementFails() { enforcement.afterElement(element); element.getValue()[0] = 'f'; - thrown.expect(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index b029dd35153c..7e8751582575 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -34,7 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.WithKeys; -import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -413,7 +413,7 @@ public InProcessTransformResult finishBundle() throws Exception { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); } @@ -472,7 +472,7 @@ public InProcessTransformResult finishBundle() throws Exception { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); }