diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index c72a1155f5..68a1b8c167 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -304,8 +304,8 @@ public void run() { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } } - fireTimers(); - mightNeedMoreWork(); + boolean timersFired = fireTimers(); + addWorkIfNecessary(timersFired); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Monitor died due to being interrupted"); @@ -326,8 +326,12 @@ public void run() { } } - private void fireTimers() throws Exception { + /** + * Fires any available timers. Returns true if at least one timer was fired. + */ + private boolean fireTimers() throws Exception { try { + boolean firedTimers = false; for (Map.Entry, Map> transformTimers : evaluationContext.extractFiredTimers().entrySet()) { AppliedPTransform transform = transformTimers.getKey(); @@ -346,9 +350,11 @@ private void fireTimers() throws Exception { .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + firedTimers = true; } } } + return firedTimers; } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); throw e; @@ -367,26 +373,58 @@ private boolean shouldShutdown() { return false; } - private void mightNeedMoreWork() { - synchronized (scheduledExecutors) { - for (TransformExecutor executor : scheduledExecutors.keySet()) { - Thread thread = executor.getThread(); - if (thread != null) { - switch (thread.getState()) { - case BLOCKED: - case WAITING: - case TERMINATED: - case TIMED_WAITING: - break; - default: - return; - } - } + /** + * If all active {@link TransformExecutor TransformExecutors} are in a blocked state, + * add more work from root nodes that may have additional work. This ensures that if a pipeline + * has elements available from the root nodes it will add those elements when necessary. + */ + private void addWorkIfNecessary(boolean firedTimers) { + // If any timers have fired, they will add more work; We don't need to add more + if (firedTimers) { + return; + } + for (TransformExecutor executor : scheduledExecutors.keySet()) { + if (!isExecutorBlocked(executor)) { + // We have at least one executor that can proceed without adding additional work + return; } } // All current TransformExecutors are blocked; add more work from the roots. for (AppliedPTransform root : rootNodes) { - scheduleConsumption(root, null, defaultCompletionCallback); + if (!evaluationContext.isDone(root)) { + scheduleConsumption(root, null, defaultCompletionCallback); + } + } + } + + /** + * Return true if the provided executor might make more progress if no action is taken. + * + *

May return false even if all executor threads are currently blocked or cleaning up, as + * these can cause more work to be scheduled. If this does not occur, after these calls + * terminate, future calls will return true if all executors are waiting. + */ + private boolean isExecutorBlocked(TransformExecutor executor) { + Thread thread = executor.getThread(); + if (thread == null) { + return false; + } + switch (thread.getState()) { + case TERMINATED: + throw new IllegalStateException(String.format( + "Unexpectedly encountered a Terminated TransformExecutor %s", executor)); + case WAITING: + case TIMED_WAITING: + // The thread is waiting for some external input. Adding more work may cause the thread + // to stop waiting (e.g. the thread is waiting on an unbounded side input) + return true; + case BLOCKED: + // The executor is blocked on acquisition of a java monitor. This usually means it is + // making a call to the EvaluationContext, but not a model-blocking call - and will + // eventually complete, at which point we may reevaluate. + default: + // NEW and RUNNABLE threads can make progress + return false; } } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index 094526d962..a9a62a6aa3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -861,22 +861,6 @@ private void updatePending( return allTimers; } - /** - * Returns true if, for any {@link TransformWatermarks} returned by - * {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - */ - public boolean allWatermarksAtPositiveInfinity() { - for (Map.Entry, TransformWatermarks> watermarksEntry : - transformToWatermarks.entrySet()) { - Instant endOfTime = THE_END_OF_TIME.get(); - if (watermarksEntry.getValue().getOutputWatermark().isBefore(endOfTime)) { - return false; - } - } - return true; - } - /** * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global, * and as such the watermark manager must track holds and the release of holds on a per-key basis. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index 2908fba818..4aeb0d3b2a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -357,27 +357,49 @@ public CounterSet getCounters() { } /** - * Returns true if all steps are done. + * Returns true if the step will not produce additional output. + * + *

If the provided transform produces only {@link IsBounded#BOUNDED} + * {@link PCollection PCollections}, returns true if the watermark is at + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. + * + *

If the provided transform produces any {@link IsBounded#UNBOUNDED} + * {@link PCollection PCollections}, returns the value of + * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. */ - public boolean isDone() { - if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) { + public boolean isDone(AppliedPTransform transform) { + // if the PTransform's watermark isn't at the max value, it isn't done + if (watermarkManager + .getWatermarks(transform) + .getOutputWatermark() + .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { return false; } - if (!watermarkManager.allWatermarksAtPositiveInfinity()) { - return false; + // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, + // the PTransform may produce additional output. It is not done. + for (PValue output : transform.getOutput().expand()) { + if (output instanceof PCollection) { + IsBounded bounded = ((PCollection) output).isBounded(); + if (bounded.equals(IsBounded.UNBOUNDED) + && !options.isShutdownUnboundedProducersWithMaxWatermark()) { + return false; + } + } } + // The PTransform's watermark was at positive infinity and all of its outputs are known to be + // done. It is done. return true; } - private boolean containsUnboundedPCollection() { + /** + * Returns true if all steps are done. + */ + public boolean isDone() { for (AppliedPTransform transform : stepNames.keySet()) { - for (PValue value : transform.getInput().expand()) { - if (value instanceof PCollection - && ((PCollection) value).isBounded().equals(IsBounded.UNBOUNDED)) { - return true; - } + if (!isDone(transform)) { + return false; } } - return false; + return true; } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 149096040a..1a0f5053f4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -24,12 +24,14 @@ import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.io.CountingInput; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -58,6 +60,7 @@ import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -80,25 +83,33 @@ public class InProcessEvaluationContextTest { private TestPipeline p; private InProcessEvaluationContext context; + private PCollection created; private PCollection> downstream; private PCollectionView> view; + private PCollection unbounded; + @Before public void setup() { InProcessPipelineRunner runner = InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); downstream = created.apply(WithKeys.of("foo")); view = created.apply(View.asIterable()); + unbounded = p.apply(CountingInput.unbounded()); Collection> rootTransforms = - ImmutableList.>of(created.getProducingTransformInternal()); + ImmutableList.>of( + created.getProducingTransformInternal(), unbounded.getProducingTransformInternal()); Map>> valueToConsumers = new HashMap<>(); valueToConsumers.put( created, ImmutableList.>of( downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); + valueToConsumers.put(unbounded, ImmutableList.>of()); valueToConsumers.put(downstream, ImmutableList.>of()); valueToConsumers.put(view, ImmutableList.>of()); @@ -106,6 +117,7 @@ public void setup() { stepNames.put(created.getProducingTransformInternal(), "s1"); stepNames.put(downstream.getProducingTransformInternal(), "s2"); stepNames.put(view.getProducingTransformInternal(), "s3"); + stepNames.put(unbounded.getProducingTransformInternal(), "s4"); Collection> views = ImmutableList.>of(view); context = InProcessEvaluationContext.create( @@ -421,6 +433,102 @@ public void createKeyedBundleKeyed() { assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); } + @Test + public void isDoneWithUnboundedPCollectionAndShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithUnboundedPCollectionAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + } + + @Test + public void isDoneWithOnlyBoundedPCollections() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); + } + + @Test + public void isDoneWithPartiallyDone() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); + assertThat(context.isDone(), is(false)); + + UncommittedBundle rootBundle = context.createRootBundle(created); + rootBundle.add(WindowedValue.valueInGlobalWindow(1)); + Iterable> handleResult = + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .addOutput(rootBundle) + .build()); + @SuppressWarnings("unchecked") + CommittedBundle committedBundle = + (CommittedBundle) Iterables.getOnlyElement(handleResult); + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(true)); + } + + @Test + public void isDoneWithUnboundedAndNotShutdown() { + context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); + assertThat(context.isDone(), is(false)); + + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + context.handleResult( + null, + ImmutableList.of(), + StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + + context.handleResult( + context.createRootBundle(created).commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + assertThat(context.isDone(), is(false)); + } + private static class TestBoundedWindow extends BoundedWindow { private final Instant ts;