From 9423fa60450150cf8b59e73078a0bef8fc9eb7de Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 21 Mar 2016 17:00:35 -0700 Subject: [PATCH 1/6] Schedule roots less aggressively The excess scheduling of known-empty bundles can consume excessive resources. --- .../ExecutorServiceParallelExecutor.java | 38 ++++++++++--------- .../inprocess/InProcessEvaluationContext.java | 19 ++++++++++ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index ae686f297945..c81e185fd36c 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -304,8 +304,9 @@ public void run() { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } } - fireTimers(); - mightNeedMoreWork(); + if (!fireTimers()) { + mightNeedMoreWork(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Monitor died due to being interrupted"); @@ -326,8 +327,9 @@ public void run() { } } - private void fireTimers() throws Exception { + private boolean fireTimers() throws Exception { try { + boolean firedTimers = false; for (Map.Entry, Map> transformTimers : evaluationContext.extractFiredTimers().entrySet()) { AppliedPTransform transform = transformTimers.getKey(); @@ -346,9 +348,11 @@ private void fireTimers() throws Exception { .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + firedTimers = true; } } } + return firedTimers; } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); throw e; @@ -368,25 +372,25 @@ private boolean shouldShutdown() { } private void mightNeedMoreWork() { - synchronized (scheduledExecutors) { - for (TransformExecutor executor : scheduledExecutors.keySet()) { - Thread thread = executor.getThread(); - if (thread != null) { - switch (thread.getState()) { - case BLOCKED: - case WAITING: - case TERMINATED: - case TIMED_WAITING: - break; - default: - return; - } + for (TransformExecutor executor : scheduledExecutors.keySet()) { + Thread thread = executor.getThread(); + if (thread != null) { + switch (thread.getState()) { + case BLOCKED: + case WAITING: + case TERMINATED: + case TIMED_WAITING: + break; + default: + return; } } } // All current TransformExecutors are blocked; add more work from the roots. for (AppliedPTransform root : rootNodes) { - scheduleConsumption(root, null, defaultCompletionCallback); + if (!evaluationContext.isDone(root)) { + scheduleConsumption(root, null, defaultCompletionCallback); + } } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 2908fba8185a..617baec1da2d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -356,6 +356,25 @@ public CounterSet getCounters() { return watermarkManager.extractFiredTimers(); } + public boolean isDone(AppliedPTransform transform) { + boolean done = true; + for (PValue output : transform.getOutput().expand()) { + if (output instanceof PCollection) { + IsBounded bounded = ((PCollection) output).isBounded(); + if (bounded.equals(IsBounded.BOUNDED) + || options.isShutdownUnboundedProducersWithMaxWatermark()) { + done &= + watermarkManager + .getWatermarks(transform) + .getOutputWatermark() + .equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } else { + done = false; + } + } + } + return done; + } /** * Returns true if all steps are done. */ From 550953f82f57e6777c178a2d7cd9a827b1237c0c Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 22 Mar 2016 13:59:57 -0700 Subject: [PATCH 2/6] fixup! Schedule roots less aggressively Add documentation around completion and maybe adding more work. Refactor mightNeedMoreWork to helper function the "can't make progress" check Rename mightNeedMoreWork -> addWorkIfNecessary --- .../ExecutorServiceParallelExecutor.java | 43 +++++++++++++------ .../inprocess/InProcessEvaluationContext.java | 12 ++++++ 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index c81e185fd36c..8c8109999f3b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -305,7 +305,8 @@ public void run() { } } if (!fireTimers()) { - mightNeedMoreWork(); + // If any timers have fired, they will add more work; We don't need to add more + addWorkIfNecessary(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -327,6 +328,9 @@ public void run() { } } + /** + * Fires any available timers. Returns true if at least one timer was fired. + */ private boolean fireTimers() throws Exception { try { boolean firedTimers = false; @@ -371,19 +375,16 @@ private boolean shouldShutdown() { return false; } - private void mightNeedMoreWork() { + /** + * If all active {@link TransformExecutor TransformExecutors} are in a blocked state, + * add more work from root nodes that may have additional work. This ensures that if a pipeline + * has elements available from the root nodes it will add those elements when necessary. + */ + private void addWorkIfNecessary() { for (TransformExecutor executor : scheduledExecutors.keySet()) { - Thread thread = executor.getThread(); - if (thread != null) { - switch (thread.getState()) { - case BLOCKED: - case WAITING: - case TERMINATED: - case TIMED_WAITING: - break; - default: - return; - } + if (!isExecutorBlocked(executor)) { + // We have at least one executor that can proceed without adding additional work + return; } } // All current TransformExecutors are blocked; add more work from the roots. @@ -393,6 +394,22 @@ private void mightNeedMoreWork() { } } } + + private boolean isExecutorBlocked(TransformExecutor executor) { + Thread thread = executor.getThread(); + if (thread == null) { + return false; + } + switch (thread.getState()) { + case BLOCKED: + case WAITING: + case TERMINATED: + case TIMED_WAITING: + return true; + default: + return false; + } + } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 617baec1da2d..7d3ba4746538 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -356,6 +356,17 @@ public CounterSet getCounters() { return watermarkManager.extractFiredTimers(); } + /** + * Returns true if the step will not produce additional output. + * + *

If the provided transform produces only {@link IsBounded#BOUNDED} + * {@link PCollection PCollections}, returns true if the watermark is at + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. + * + *

If the provided transform produces any {@link IsBounded#UNBOUNDED} + * {@link PCollection PCollections}, returns the value of + * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. + */ public boolean isDone(AppliedPTransform transform) { boolean done = true; for (PValue output : transform.getOutput().expand()) { @@ -375,6 +386,7 @@ public boolean isDone(AppliedPTransform transform) { } return done; } + /** * Returns true if all steps are done. */ From 3a649bdd7b6faa72c8aad596ccf8e6206bfb2b9d Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 22 Mar 2016 16:03:14 -0700 Subject: [PATCH 3/6] fixup! Schedule roots less aggressively Comment on why we might decide that work can still be done. TODO: test many many times --- .../ExecutorServiceParallelExecutor.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 8c8109999f3b..31461e1ce331 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -395,18 +395,29 @@ private void addWorkIfNecessary() { } } + /** + * Return true if the provided executor might make more progress if no action is taken. + */ private boolean isExecutorBlocked(TransformExecutor executor) { Thread thread = executor.getThread(); if (thread == null) { return false; } switch (thread.getState()) { - case BLOCKED: - case WAITING: case TERMINATED: + throw new IllegalStateException(String.format( + "Unexpectedly encountered a Terminated TransformExecutor %s", executor)); + case WAITING: case TIMED_WAITING: + // The thread is waiting for some external input. Adding more work may cause the thread + // to stop waiting (e.g. the thread is waiting on an unbounded side input) return true; + case BLOCKED: + // The executor is blocked on acquisition of a java monitor. This usually means it is + // making a call to the EvaluationContext, but not a model-blocking call - and will + // eventually complete, at which point we may reevaluate. default: + // NEW and RUNNABLE threads can make progress return false; } } From 35e823a43735661aeeef092e7d68a4a189b840f9 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 23 Mar 2016 13:10:19 -0700 Subject: [PATCH 4/6] fixup! Schedule roots less aggressively Slightly cleaner method calls --- .../ExecutorServiceParallelExecutor.java | 16 +++++++++----- .../inprocess/InProcessEvaluationContext.java | 21 +++++++++---------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 31461e1ce331..7b68bdb55094 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -304,10 +304,8 @@ public void run() { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } } - if (!fireTimers()) { - // If any timers have fired, they will add more work; We don't need to add more - addWorkIfNecessary(); - } + boolean timersFired = fireTimers(); + addWorkIfNecessary(timersFired); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Monitor died due to being interrupted"); @@ -380,7 +378,11 @@ private boolean shouldShutdown() { * add more work from root nodes that may have additional work. This ensures that if a pipeline * has elements available from the root nodes it will add those elements when necessary. */ - private void addWorkIfNecessary() { + private void addWorkIfNecessary(boolean firedTimers) { + // If any timers have fired, they will add more work; We don't need to add more + if (firedTimers) { + return; + } for (TransformExecutor executor : scheduledExecutors.keySet()) { if (!isExecutorBlocked(executor)) { // We have at least one executor that can proceed without adding additional work @@ -397,6 +399,10 @@ private void addWorkIfNecessary() { /** * Return true if the provided executor might make more progress if no action is taken. + * + *

May return false even if all executor threads are currently blocked or cleaning up, as + * these can cause more work to be scheduled. If this does not occur, after these calls + * terminate, future calls will return true if all executors are waiting. */ private boolean isExecutorBlocked(TransformExecutor executor) { Thread thread = executor.getThread(); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 7d3ba4746538..5a6a2cbeeab9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -368,23 +368,22 @@ public CounterSet getCounters() { * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. */ public boolean isDone(AppliedPTransform transform) { - boolean done = true; + if (!watermarkManager + .getWatermarks(transform) + .getOutputWatermark() + .equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + return false; + } for (PValue output : transform.getOutput().expand()) { if (output instanceof PCollection) { IsBounded bounded = ((PCollection) output).isBounded(); - if (bounded.equals(IsBounded.BOUNDED) - || options.isShutdownUnboundedProducersWithMaxWatermark()) { - done &= - watermarkManager - .getWatermarks(transform) - .getOutputWatermark() - .equals(BoundedWindow.TIMESTAMP_MAX_VALUE); - } else { - done = false; + if (bounded.equals(IsBounded.UNBOUNDED) + && !options.isShutdownUnboundedProducersWithMaxWatermark()) { + return false; } } } - return done; + return true; } /** From caa142dd7f7193ce3570991d1e614237d8ee9f4f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 23 Mar 2016 13:29:35 -0700 Subject: [PATCH 5/6] fixup! Schedule roots less aggressively add tests for isDone. containsUnboundedPCollection should look at outputs rather than inputs --- .../inprocess/InProcessEvaluationContext.java | 2 +- .../InProcessEvaluationContextTest.java | 110 +++++++++++++++++- 2 files changed, 110 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 5a6a2cbeeab9..7ef9b2528450 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -401,7 +401,7 @@ public boolean isDone() { private boolean containsUnboundedPCollection() { for (AppliedPTransform transform : stepNames.keySet()) { - for (PValue value : transform.getInput().expand()) { + for (PValue value : transform.getOutput().expand()) { if (value instanceof PCollection && ((PCollection) value).isBounded().equals(IsBounded.UNBOUNDED)) { return true; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 149096040a3d..1a0f5053f49c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -24,12 +24,14 @@ import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.io.CountingInput; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -58,6 +60,7 @@ import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -80,25 +83,33 @@ public class InProcessEvaluationContextTest { private TestPipeline p; private InProcessEvaluationContext context; + private PCollection created; private PCollection> downstream; private PCollectionView> view; + private PCollection unbounded; + @Before public void setup() { InProcessPipelineRunner runner = InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); downstream = created.apply(WithKeys.of("foo")); view = created.apply(View.asIterable()); + unbounded = p.apply(CountingInput.unbounded()); Collection> rootTransforms = - ImmutableList.>of(created.getProducingTransformInternal()); + ImmutableList.>of( + created.getProducingTransformInternal(), unbounded.getProducingTransformInternal()); Map>> valueToConsumers = new HashMap<>(); valueToConsumers.put( created, ImmutableList.>of( downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); + valueToConsumers.put(unbounded, ImmutableList.>of()); valueToConsumers.put(downstream, ImmutableList.>of()); valueToConsumers.put(view, ImmutableList.>of()); @@ -106,6 +117,7 @@ public void setup() { stepNames.put(created.getProducingTransformInternal(), "s1"); stepNames.put(downstream.getProducingTransformInternal(), "s2"); stepNames.put(view.getProducingTransformInternal(), "s3"); + stepNames.put(unbounded.getProducingTransformInternal(), "s4"); Collection> views = ImmutableList.>of(view); context = InProcessEvaluationContext.create( @@ -421,6 +433,102 @@ public void createKeyedBundleKeyed() { assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); } + @Test + public void isDoneWithUnboundedPCollectionAndShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithUnboundedPCollectionAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + } + + @Test + public void isDoneWithOnlyBoundedPCollections() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithPartiallyDone() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(), is(false)); + + UncommittedBundle rootBundle = context.createRootBundle(created); + rootBundle.add(WindowedValue.valueInGlobalWindow(1)); + Iterable> handleResult = + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .addOutput(rootBundle) + .build()); + @SuppressWarnings("unchecked") + CommittedBundle committedBundle = + (CommittedBundle) Iterables.getOnlyElement(handleResult); + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(true)); + } + + @Test + public void isDoneWithUnboundedAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + } + private static class TestBoundedWindow extends BoundedWindow { private final Instant ts; From d354ee9503f591fbfd9b300aab84a84eb600c1bc Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 23 Mar 2016 14:36:51 -0700 Subject: [PATCH 6/6] fixup! Schedule roots less aggressively Improve isDone implementation --- .../inprocess/InMemoryWatermarkManager.java | 16 ----------- .../inprocess/InProcessEvaluationContext.java | 28 +++++++------------ 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index 094526d9622a..a9a62a6aa388 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -861,22 +861,6 @@ private void updatePending( return allTimers; } - /** - * Returns true if, for any {@link TransformWatermarks} returned by - * {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - */ - public boolean allWatermarksAtPositiveInfinity() { - for (Map.Entry, TransformWatermarks> watermarksEntry : - transformToWatermarks.entrySet()) { - Instant endOfTime = THE_END_OF_TIME.get(); - if (watermarksEntry.getValue().getOutputWatermark().isBefore(endOfTime)) { - return false; - } - } - return true; - } - /** * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global, * and as such the watermark manager must track holds and the release of holds on a per-key basis. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 7ef9b2528450..4aeb0d3b2a26 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -368,12 +368,15 @@ public CounterSet getCounters() { * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. */ public boolean isDone(AppliedPTransform transform) { - if (!watermarkManager + // if the PTransform's watermark isn't at the max value, it isn't done + if (watermarkManager .getWatermarks(transform) .getOutputWatermark() - .equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { return false; } + // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, + // the PTransform may produce additional output. It is not done. for (PValue output : transform.getOutput().expand()) { if (output instanceof PCollection) { IsBounded bounded = ((PCollection) output).isBounded(); @@ -383,6 +386,8 @@ public boolean isDone(AppliedPTransform transform) { } } } + // The PTransform's watermark was at positive infinity and all of its outputs are known to be + // done. It is done. return true; } @@ -390,24 +395,11 @@ public boolean isDone(AppliedPTransform transform) { * Returns true if all steps are done. */ public boolean isDone() { - if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) { - return false; - } - if (!watermarkManager.allWatermarksAtPositiveInfinity()) { - return false; - } - return true; - } - - private boolean containsUnboundedPCollection() { for (AppliedPTransform transform : stepNames.keySet()) { - for (PValue value : transform.getOutput().expand()) { - if (value instanceof PCollection - && ((PCollection) value).isBounded().equals(IsBounded.UNBOUNDED)) { - return true; - } + if (!isDone(transform)) { + return false; } } - return false; + return true; } }