Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ public void run() {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}
}
fireTimers();
mightNeedMoreWork();
boolean timersFired = fireTimers();
addWorkIfNecessary(timersFired);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Monitor died due to being interrupted");
Expand All @@ -326,8 +326,12 @@ public void run() {
}
}

private void fireTimers() throws Exception {
/**
* Fires any available timers. Returns true if at least one timer was fired.
*/
private boolean fireTimers() throws Exception {
try {
boolean firedTimers = false;
for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
evaluationContext.extractFiredTimers().entrySet()) {
AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
Expand All @@ -346,9 +350,11 @@ private void fireTimers() throws Exception {
.add(WindowedValue.valueInEmptyWindows(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
firedTimers = true;
}
}
}
return firedTimers;
} catch (Exception e) {
LOG.error("Internal Error while delivering timers", e);
throw e;
Expand All @@ -367,26 +373,58 @@ private boolean shouldShutdown() {
return false;
}

private void mightNeedMoreWork() {
synchronized (scheduledExecutors) {
for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
Thread thread = executor.getThread();
if (thread != null) {
switch (thread.getState()) {
case BLOCKED:
case WAITING:
case TERMINATED:
case TIMED_WAITING:
break;
default:
return;
}
}
/**
* If all active {@link TransformExecutor TransformExecutors} are in a blocked state,
* add more work from root nodes that may have additional work. This ensures that if a pipeline
* has elements available from the root nodes it will add those elements when necessary.
*/
private void addWorkIfNecessary(boolean firedTimers) {
// If any timers have fired, they will add more work; We don't need to add more
if (firedTimers) {
return;
}
for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
if (!isExecutorBlocked(executor)) {
// We have at least one executor that can proceed without adding additional work
return;
}
}
// All current TransformExecutors are blocked; add more work from the roots.
for (AppliedPTransform<?, ?, ?> root : rootNodes) {
scheduleConsumption(root, null, defaultCompletionCallback);
if (!evaluationContext.isDone(root)) {
scheduleConsumption(root, null, defaultCompletionCallback);
}
}
}

/**
* Return true if the provided executor might make more progress if no action is taken.
*
* <p>May return false even if all executor threads are currently blocked or cleaning up, as
* these can cause more work to be scheduled. If this does not occur, after these calls
* terminate, future calls will return true if all executors are waiting.
*/
private boolean isExecutorBlocked(TransformExecutor<?> executor) {
Thread thread = executor.getThread();
if (thread == null) {
return false;
}
switch (thread.getState()) {
case TERMINATED:
throw new IllegalStateException(String.format(
"Unexpectedly encountered a Terminated TransformExecutor %s", executor));
case WAITING:
case TIMED_WAITING:
// The thread is waiting for some external input. Adding more work may cause the thread
// to stop waiting (e.g. the thread is waiting on an unbounded side input)
return true;
case BLOCKED:
// The executor is blocked on acquisition of a java monitor. This usually means it is
// making a call to the EvaluationContext, but not a model-blocking call - and will
// eventually complete, at which point we may reevaluate.
default:
// NEW and RUNNABLE threads can make progress
return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,22 +861,6 @@ private void updatePending(
return allTimers;
}

/**
* Returns true if, for any {@link TransformWatermarks} returned by
* {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to
* {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
*/
public boolean allWatermarksAtPositiveInfinity() {
for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
transformToWatermarks.entrySet()) {
Instant endOfTime = THE_END_OF_TIME.get();
if (watermarksEntry.getValue().getOutputWatermark().isBefore(endOfTime)) {
return false;
}
}
return true;
}

/**
* A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
* and as such the watermark manager must track holds and the release of holds on a per-key basis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,27 +357,49 @@ public CounterSet getCounters() {
}

/**
* Returns true if all steps are done.
* Returns true if the step will not produce additional output.
*
* <p>If the provided transform produces only {@link IsBounded#BOUNDED}
* {@link PCollection PCollections}, returns true if the watermark is at
* {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
*
* <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
* {@link PCollection PCollections}, returns the value of
* {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
*/
public boolean isDone() {
if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) {
public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
// if the PTransform's watermark isn't at the max value, it isn't done
if (watermarkManager
.getWatermarks(transform)
.getOutputWatermark()
.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
return false;
}
if (!watermarkManager.allWatermarksAtPositiveInfinity()) {
return false;
// If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
// the PTransform may produce additional output. It is not done.
for (PValue output : transform.getOutput().expand()) {
if (output instanceof PCollection) {
IsBounded bounded = ((PCollection<?>) output).isBounded();
if (bounded.equals(IsBounded.UNBOUNDED)
&& !options.isShutdownUnboundedProducersWithMaxWatermark()) {
return false;
}
}
}
// The PTransform's watermark was at positive infinity and all of its outputs are known to be
// done. It is done.
return true;
}

private boolean containsUnboundedPCollection() {
/**
* Returns true if all steps are done.
*/
public boolean isDone() {
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
for (PValue value : transform.getInput().expand()) {
if (value instanceof PCollection
&& ((PCollection<?>) value).isBounded().equals(IsBounded.UNBOUNDED)) {
return true;
}
if (!isDone(transform)) {
return false;
}
}
return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import static org.junit.Assert.assertThat;

import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.io.CountingInput;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
Expand Down Expand Up @@ -58,6 +60,7 @@
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import org.hamcrest.Matchers;
import org.joda.time.Instant;
Expand All @@ -80,32 +83,41 @@
public class InProcessEvaluationContextTest {
private TestPipeline p;
private InProcessEvaluationContext context;

private PCollection<Integer> created;
private PCollection<KV<String, Integer>> downstream;
private PCollectionView<Iterable<Integer>> view;
private PCollection<Long> unbounded;


@Before
public void setup() {
InProcessPipelineRunner runner =
InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());

p = TestPipeline.create();

created = p.apply(Create.of(1, 2, 3));
downstream = created.apply(WithKeys.<String, Integer>of("foo"));
view = created.apply(View.<Integer>asIterable());
unbounded = p.apply(CountingInput.unbounded());
Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
ImmutableList.<AppliedPTransform<?, ?, ?>>of(created.getProducingTransformInternal());
ImmutableList.<AppliedPTransform<?, ?, ?>>of(
created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
valueToConsumers.put(
created,
ImmutableList.<AppliedPTransform<?, ?, ?>>of(
downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());

Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
stepNames.put(created.getProducingTransformInternal(), "s1");
stepNames.put(downstream.getProducingTransformInternal(), "s2");
stepNames.put(view.getProducingTransformInternal(), "s3");
stepNames.put(unbounded.getProducingTransformInternal(), "s4");

Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
context = InProcessEvaluationContext.create(
Expand Down Expand Up @@ -421,6 +433,102 @@ public void createKeyedBundleKeyed() {
assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
}

@Test
public void isDoneWithUnboundedPCollectionAndShutdown() {
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));

context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
}

@Test
public void isDoneWithUnboundedPCollectionAndNotShutdown() {
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));

context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
}

@Test
public void isDoneWithOnlyBoundedPCollections() {
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
assertThat(context.isDone(created.getProducingTransformInternal()), is(false));

context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
}

@Test
public void isDoneWithPartiallyDone() {
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
assertThat(context.isDone(), is(false));

UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
rootBundle.add(WindowedValue.valueInGlobalWindow(1));
Iterable<? extends CommittedBundle<?>> handleResult =
context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal())
.addOutput(rootBundle)
.build());
@SuppressWarnings("unchecked")
CommittedBundle<Integer> committedBundle =
(CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult);
context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
context.handleResult(
committedBundle,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(false));

context.handleResult(
committedBundle,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(true));
}

@Test
public void isDoneWithUnboundedAndNotShutdown() {
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
assertThat(context.isDone(), is(false));

context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
context.handleResult(
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
context.handleResult(
context.createRootBundle(created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(false));

context.handleResult(
context.createRootBundle(created).commit(Instant.now()),
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(false));
}

private static class TestBoundedWindow extends BoundedWindow {
private final Instant ts;

Expand Down