From 9006c091c96d848f3940a3fca855d2bf02931b57 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 11:10:18 -0700 Subject: [PATCH] Give root transforms step names Fix a bug where steps would only be given step names if they were a non-root node. Use the ConsumerTrackingPipelineVisitor in the InProcessEvaluationContext test to handle runner-expanded transforms --- .../ConsumerTrackingPipelineVisitor.java | 2 +- .../ConsumerTrackingPipelineVisitorTest.java | 37 ++++++++++++++ .../InProcessEvaluationContextTest.java | 50 +++++++------------ 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java index ec4f08bc561e..48836e9c7c2e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java @@ -76,12 +76,12 @@ public void leaveCompositeTransform(TransformTreeNode node) { public void visitTransform(TransformTreeNode node) { toFinalize.removeAll(node.getInput().expand()); AppliedPTransform appliedTransform = getAppliedTransform(node); + stepNames.put(appliedTransform, genStepName()); if (node.getInput().expand().isEmpty()) { rootTransforms.add(appliedTransform); } else { for (PValue value : node.getInput().expand()) { valueToConsumers.get(value).add(appliedTransform); - stepNames.put(appliedTransform, genStepName()); } } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java index bea6fe1bd6f5..905f58f8714e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java @@ -176,6 +176,43 @@ public PDone apply(PInput input) { assertThat(visitor.getUnfinalizedPValues(), emptyIterable()); } + @Test + public void getStepNamesContainsAllTransforms() { + PCollection created = p.apply(Create.of("1", "2", "3")); + PCollection transformed = + created.apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(DoFn.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + PDone finished = + transformed.apply( + new PTransform() { + @Override + public PDone apply(PInput input) { + return PDone.in(input.getPipeline()); + } + }); + + p.traverseTopologically(visitor); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + created.getProducingTransformInternal(), "s0")); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + transformed.getProducingTransformInternal(), "s1")); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + finished.getProducingTransformInternal(), "s2")); + } + @Test public void traverseMultipleTimesThrows() { p.apply(Create.of(1, 2, 3)); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 564f3f249f45..8fbcc6f75a00 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -73,7 +73,6 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest { private PCollection> downstream; private PCollectionView> view; private PCollection unbounded; - + private Collection> rootTransforms; + private Map>> valueToConsumers; @Before public void setup() { @@ -103,31 +103,19 @@ public void setup() { downstream = created.apply(WithKeys.of("foo")); view = created.apply(View.asIterable()); unbounded = p.apply(CountingInput.unbounded()); - Collection> rootTransforms = - ImmutableList.>of( - created.getProducingTransformInternal(), unbounded.getProducingTransformInternal()); - Map>> valueToConsumers = new HashMap<>(); - valueToConsumers.put( - created, - ImmutableList.>of( - downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); - valueToConsumers.put(unbounded, ImmutableList.>of()); - valueToConsumers.put(downstream, ImmutableList.>of()); - valueToConsumers.put(view, ImmutableList.>of()); - - Map, String> stepNames = new HashMap<>(); - stepNames.put(created.getProducingTransformInternal(), "s1"); - stepNames.put(downstream.getProducingTransformInternal(), "s2"); - stepNames.put(view.getProducingTransformInternal(), "s3"); - stepNames.put(unbounded.getProducingTransformInternal(), "s4"); - - Collection> views = ImmutableList.>of(view); - context = InProcessEvaluationContext.create( + + ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); + p.traverseTopologically(cVis); + rootTransforms = cVis.getRootTransforms(); + valueToConsumers = cVis.getValueToConsumers(); + + context = + InProcessEvaluationContext.create( runner.getPipelineOptions(), rootTransforms, valueToConsumers, - stepNames, - views); + cVis.getStepNames(), + cVis.getViews()); } @Test @@ -492,16 +480,14 @@ public void isDoneWithPartiallyDone() { null, ImmutableList.of(), StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.handleResult( - committedBundle, - ImmutableList.of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); assertThat(context.isDone(), is(false)); - context.handleResult( - committedBundle, - ImmutableList.of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + for (AppliedPTransform consumers : valueToConsumers.get(created)) { + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(consumers).build()); + } assertThat(context.isDone(), is(true)); }