From 7dced85eb0aba73d05f291c499179c2fcef1dc4a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 16:48:15 -0700 Subject: [PATCH 1/2] Apply ModelEnforcement in the InProcessPipelineRunner This ensures that user code does not violate the model. Add a flag to control application of immutability enforcement. This flag is enabled by default. --- .../inprocess/InProcessPipelineOptions.java | 9 +++++++ .../inprocess/InProcessPipelineRunner.java | 25 ++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index 5ee0e88b5b..9c10510bf9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -87,4 +87,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa boolean isBlockOnRun(); void setBlockOnRun(boolean b); + + @Default.Boolean(true) + @Description( + "Controls whether the runner should ensure that all of the elements of every " + + "PCollection are not mutated. PTransforms are not permitted to mutate input elements " + + "at any point, or output elements after they are output.") + boolean isTestImmutability(); + + void setTestImmutability(boolean test); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 9b6db8f730..c0cb1bc7c9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -31,6 +31,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; @@ -46,13 +47,13 @@ import com.google.cloud.dataflow.sdk.values.POutput; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.joda.time.Instant; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -267,11 +268,29 @@ public InProcessPipelineResult run(Pipeline pipeline) { private Map, Collection> defaultModelEnforcements(InProcessPipelineOptions options) { - return Collections.emptyMap(); + ImmutableMap.Builder, Collection> + enforcements = ImmutableMap.builder(); + Collection parDoEnforcements = createParDoEnforcements(options); + enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + return enforcements.build(); + } + + private Collection createParDoEnforcements( + InProcessPipelineOptions options) { + ImmutableList.Builder enforcements = ImmutableList.builder(); + if (options.isTestImmutability()) { + enforcements.add(ImmutabilityEnforcementFactory.create()); + } + return enforcements.build(); } private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { - return InProcessBundleFactory.create(); + BundleFactory bundleFactory = InProcessBundleFactory.create(); + if (pipelineOptions.isTestImmutability()) { + bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); + } + return bundleFactory; } /** From 55ba18bae28fc6304713c76bf95ced798a94643c Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 5 Apr 2016 13:19:24 -0700 Subject: [PATCH 2/2] Improve ImmutabilityEnforcement Check per-element, to catch failures within a call to ProcessElement more quickly. Move wrapping of exceptions over the course of calls to ProcessElement to ParDoInProcessEvaluator. --- .../ImmutabilityEnforcementFactory.java | 36 +++++++++++-------- .../inprocess/ParDoInProcessEvaluator.java | 13 +++++-- .../ImmutabilityEnforcementFactoryTest.java | 9 ++--- .../inprocess/TransformExecutorTest.java | 6 ++-- 4 files changed, 38 insertions(+), 26 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java index dfc56a9d3c..0f2607ff45 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java @@ -68,26 +68,34 @@ public void beforeElement(WindowedValue element) { } } + @Override + public void afterElement(WindowedValue element) { + verifyUnmodified(mutationElements.get(element)); + } + @Override public void afterFinish( CommittedBundle input, InProcessTransformResult result, Iterable> outputs) { for (MutationDetector detector : mutationElements.values()) { - try { - detector.verifyUnmodified(); - } catch (IllegalMutationException e) { - throw UserCodeException.wrap( - new IllegalMutationException( - String.format( - "PTransform %s illegaly mutated value %s of class %s." - + " Input values must not be mutated in any way.", - transform.getFullName(), - e.getSavedValue(), - e.getSavedValue().getClass()), - e.getSavedValue(), - e.getNewValue())); - } + verifyUnmodified(detector); + } + } + + private void verifyUnmodified(MutationDetector detector) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException e) { + throw new IllegalMutationException( + String.format( + "PTransform %s illegaly mutated value %s of class %s." + + " Input values must not be mutated in any way.", + transform.getFullName(), + e.getSavedValue(), + e.getSavedValue().getClass()), + e.getSavedValue(), + e.getNewValue()); } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 2a21e8cbf5..17546f1953 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -20,6 +20,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.util.DoFnRunner; import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager; +import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; @@ -54,12 +55,20 @@ public ParDoInProcessEvaluator( @Override public void processElement(WindowedValue element) { - fnRunner.processElement(element); + try { + fnRunner.processElement(element); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } } @Override public InProcessTransformResult finishBundle() { - fnRunner.finishBundle(); + try { + fnRunner.finishBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } StepTransformResult.Builder resultBuilder; CopyOnAccessInMemoryStateInternals state = stepContext.commitState(); if (state != null) { diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java index 87e12ce55f..0b5de689ec 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java @@ -15,8 +15,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import static org.hamcrest.Matchers.isA; - import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -25,7 +23,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.util.IllegalMutationException; -import com.google.cloud.dataflow.sdk.util.UserCodeException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -92,8 +89,7 @@ public void mutatedDuringProcessElementThrows() { ModelEnforcement enforcement = factory.forBundle(elements, consumer); enforcement.beforeElement(element); element.getValue()[0] = 'f'; - thrown.equals(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); @@ -116,8 +112,7 @@ public void mutatedAfterProcessElementFails() { enforcement.afterElement(element); element.getValue()[0] = 'f'; - thrown.equals(UserCodeException.class); - thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expect(IllegalMutationException.class); thrown.expectMessage(consumer.getFullName()); thrown.expectMessage("illegaly mutated"); thrown.expectMessage("Input values must not be mutated"); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java index b14b34bb24..bbf7c51757 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java @@ -32,7 +32,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.WithKeys; -import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -411,7 +411,7 @@ public InProcessTransformResult finishBundle() throws Exception { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); } @@ -470,7 +470,7 @@ public InProcessTransformResult finishBundle() throws Exception { fooBytes.getValue()[0] = 'b'; evaluatorLatch.countDown(); - thrown.expectCause(isA(UserCodeException.class)); + thrown.expectCause(isA(IllegalMutationException.class)); task.get(); }