From ad381f78f6aba359fa542035a492e9ed346f83e1 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 26 Apr 2016 16:40:58 -0700 Subject: [PATCH 1/3] Return a map of CommittedBundle to Consumers from handleResult This allows the executor to be ignorant of the mapping from PValue to Consumers, as well as allowing the TransformExecutor to pass bundles that should only be consumed by specific PTransforms. This can occur if a transform is incapable of processing a bundle. --- .../runners/inprocess/CompletionCallback.java | 9 +- .../ExecutorServiceParallelExecutor.java | 61 ++++-- .../inprocess/InMemoryWatermarkManager.java | 10 +- .../inprocess/InProcessEvaluationContext.java | 25 ++- .../inprocess/InProcessPipelineRunner.java | 1 - .../runners/inprocess/TransformExecutor.java | 6 +- .../InMemoryWatermarkManagerTest.java | 201 +++++++++++++----- .../InProcessEvaluationContextTest.java | 4 +- .../inprocess/TransformExecutorTest.java | 4 +- 9 files changed, 224 insertions(+), 97 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java index 90c488e6a40e..e825382225f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java @@ -18,15 +18,20 @@ package org.apache.beam.sdk.runners.inprocess; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import java.util.Collection; +import java.util.Map; /** * A callback for completing a bundle of input. */ interface CompletionCallback { /** - * Handle a successful result, returning the committed outputs of the result. + * Handle a successful result, returning the committed outputs of the result and the transforms + * that should consume those outputs. */ - Iterable> handleResult( + Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result); /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 3463d081d886..bd5ddf6f4cbf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.runners.inprocess; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; @@ -64,7 +66,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { private final ExecutorService executorService; - private final Map>> valueToConsumers; private final Set keyedPValues; private final TransformEvaluatorRegistry registry; @SuppressWarnings("rawtypes") @@ -86,26 +87,23 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { public static ExecutorServiceParallelExecutor create( ExecutorService executorService, - Map>> valueToConsumers, Set keyedPValues, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") Map, Collection> transformEnforcements, InProcessEvaluationContext context) { return new ExecutorServiceParallelExecutor( - executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context); + executorService, keyedPValues, registry, transformEnforcements, context); } private ExecutorServiceParallelExecutor( ExecutorService executorService, - Map>> valueToConsumers, Set keyedPValues, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") Map, Collection> transformEnforcements, InProcessEvaluationContext context) { this.executorService = executorService; - this.valueToConsumers = valueToConsumers; this.keyedPValues = keyedPValues; this.registry = registry; this.transformEnforcements = transformEnforcements; @@ -191,10 +189,8 @@ private boolean isKeyed(PValue pvalue) { return keyedPValues.contains(pvalue); } - private void scheduleConsumers(CommittedBundle bundle) { - for (AppliedPTransform consumer : valueToConsumers.get(bundle.getPCollection())) { - scheduleConsumption(consumer, bundle, defaultCompletionCallback); - } + private void scheduleConsumers(CommittedBundle bundle, AppliedPTransform consumer) { + scheduleConsumption(consumer, bundle, defaultCompletionCallback); } @Override @@ -216,12 +212,16 @@ public void awaitCompletion() throws Throwable { */ private class DefaultCompletionCallback implements CompletionCallback { @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + Map, Collection>> resultBundles = evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); - for (CommittedBundle outputBundle : resultBundles) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + for (Map.Entry, Collection>> output + : resultBundles.entrySet()) { + CommittedBundle bundle = output.getKey(); + for (AppliedPTransform consumer : output.getValue()) { + allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer)); + } } return resultBundles; } @@ -246,12 +246,16 @@ private TimerCompletionCallback(Iterable timers) { } @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + Map, Collection>> resultBundles = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle outputBundle : resultBundles) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + for (Map.Entry, Collection>> output + : resultBundles.entrySet()) { + CommittedBundle bundle = output.getKey(); + for (AppliedPTransform consumer : output.getValue()) { + allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer)); + } } return resultBundles; } @@ -269,18 +273,27 @@ public void handleThrowable(CommittedBundle inputBundle, Throwable t) { */ private static class ExecutorUpdate { private final Optional> bundle; + private final Optional> consumer; private final Optional throwable; - public static ExecutorUpdate fromBundle(CommittedBundle bundle) { - return new ExecutorUpdate(bundle, null); + public static ExecutorUpdate fromBundle( + CommittedBundle bundle, AppliedPTransform consumer) { + return new ExecutorUpdate(bundle, consumer, null); } public static ExecutorUpdate fromThrowable(Throwable t) { - return new ExecutorUpdate(null, t); + return new ExecutorUpdate(null, null, t); } - private ExecutorUpdate(CommittedBundle producedBundle, Throwable throwable) { + private ExecutorUpdate( + CommittedBundle producedBundle, + AppliedPTransform consumer, + Throwable throwable) { + checkArgument((producedBundle == null) == (consumer == null), + "The produced bundle and consuming PTransform must either " + + "both be null or neither be null"); this.bundle = Optional.fromNullable(producedBundle); + this.consumer = Optional.fromNullable(consumer); this.throwable = Optional.fromNullable(throwable); } @@ -288,6 +301,10 @@ public Optional> getBundle() { return bundle; } + public Optional> getConsumer() { + return consumer; + } + public Optional getException() { return throwable; } @@ -344,7 +361,7 @@ public void run() { while (update != null) { LOG.debug("Executor Update: {}", update); if (update.getBundle().isPresent()) { - scheduleConsumers(update.getBundle().get()); + scheduleConsumers(update.getBundle().get(), update.getConsumer().get()); } else if (update.getException().isPresent()) { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java index d941cc7ee4c9..07d72a424b6c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -808,7 +808,7 @@ public void updateWatermarks( @Nullable CommittedBundle completed, AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs, + Map, Collection>> outputs, @Nullable Instant earliestHold) { updatePending(completed, transform, timerUpdate, outputs); TransformWatermarks transformWms = transformToWatermarks.get(transform); @@ -841,15 +841,17 @@ private void updatePending( CommittedBundle input, AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs) { + Map, Collection>> outputs) { TransformWatermarks completedTransform = transformToWatermarks.get(transform); completedTransform.updateTimers(timerUpdate); if (input != null) { completedTransform.removePending(input); } - for (CommittedBundle bundle : outputs) { - for (AppliedPTransform consumer : consumers.get(bundle.getPCollection())) { + for (Map.Entry, Collection>> + outputEntry : outputs.entrySet()) { + CommittedBundle bundle = outputEntry.getKey(); + for (AppliedPTransform consumer : outputEntry.getValue()) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 3990f0d04fdb..0da61599a910 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.PValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.util.Collection; @@ -74,6 +75,11 @@ class InProcessEvaluationContext { /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ private final Map, String> stepNames; + /** + * The mapping from each {@link PValue} contained within the {@link Pipeline} to each + * {@link AppliedPTransform} that consumes it. + */ + private final Map>> valueToConsumers; /** The options that were used to create this {@link Pipeline}. */ private final InProcessPipelineOptions options; @@ -114,7 +120,7 @@ private InProcessEvaluationContext( this.options = checkNotNull(options); this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); - checkNotNull(valueToConsumers); + this.valueToConsumers = checkNotNull(valueToConsumers); checkNotNull(stepNames); checkNotNull(views); this.stepNames = stepNames; @@ -145,11 +151,11 @@ private InProcessEvaluationContext( * @param result the result of evaluating the input bundle * @return the committed bundles contained within the handled {@code result} */ - public synchronized Iterable> handleResult( + public synchronized Map, Collection>> handleResult( @Nullable CommittedBundle completedBundle, Iterable completedTimers, InProcessTransformResult result) { - Iterable> committedBundles = + Map, Collection>> committedBundles = commitBundles(result.getOutputBundles()); // Update watermarks and timers watermarkManager.updateWatermarks( @@ -179,10 +185,11 @@ public synchronized Iterable> handleResult( return committedBundles; } - private Iterable> commitBundles( - Iterable> bundles) { - ImmutableList.Builder> completed = ImmutableList.builder(); - for (UncommittedBundle inProgress : bundles) { + private Map, Collection>> commitBundles( + Iterable> outputBundles) { + ImmutableMap.Builder, Collection>> outputs + = ImmutableMap.builder(); + for (UncommittedBundle inProgress : outputBundles) { AppliedPTransform producing = inProgress.getPCollection().getProducingTransformInternal(); TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); @@ -191,10 +198,10 @@ private Iterable> commitBundles( // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so // filter them out if (!Iterables.isEmpty(committed.getElements())) { - completed.add(committed); + outputs.put(committed, valueToConsumers.get(committed.getPCollection())); } } - return completed.build(); + return outputs.build(); } private void fireAllAvailableCallbacks() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index 7897f2e31caf..539675d0c6a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -239,7 +239,6 @@ public InProcessPipelineResult run(Pipeline pipeline) { InProcessExecutor executor = ExecutorServiceParallelExecutor.create( executorService, - consumerTrackingVisitor.getValueToConsumers(), keyedPValueVisitor.getKeyedPValues(), TransformEvaluatorRegistry.defaultRegistry(), defaultModelEnforcements(options), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java index 3a7bedc8d280..22aae708f31d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; @@ -158,9 +159,10 @@ private InProcessTransformResult finishBundle( TransformEvaluator evaluator, Collection> enforcements) throws Exception { InProcessTransformResult result = evaluator.finishBundle(); - Iterable> outputs = onComplete.handleResult(inputBundle, result); + Map, Collection>> outputs = + onComplete.handleResult(inputBundle, result); for (ModelEnforcement enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs); + enforcement.afterFinish(inputBundle, result, outputs.keySet()); } return result; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 3c4503f9d5eb..cd749f33739d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -90,6 +91,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { private transient InMemoryWatermarkManager manager; private transient BundleFactory bundleFactory; + private transient Map>> valueToConsumers; + @Before public void setup() { TestPipeline p = TestPipeline.create(); @@ -133,6 +136,7 @@ public void processElement(DoFn.ProcessContext c) throws Excep flattened.getProducingTransformInternal())); consumers.put(flattened, Collections.>emptyList()); + valueToConsumers = consumers; clock = MockClock.fromInstant(new Instant(1000)); manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers); @@ -160,7 +164,7 @@ public void getWatermarkForUntouchedTransform() { public void getWatermarkForUpdatedSourceTransform() { CommittedBundle output = multiWindowedBundle(createdInts, 1); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(output), new Instant(8000L)); + allConsumers(output), new Instant(8000L)); TransformWatermarks updatedSourceWatermark = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -176,7 +180,7 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1); manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(secondPcollectionBundle), + TimerUpdate.empty(), allConsumers(secondPcollectionBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); // We didn't do anything for the first source, so we shouldn't have progressed the watermark @@ -206,12 +210,12 @@ public void getWatermarkForMultiInputTransform() { // We have finished processing the bundle from the second PCollection, but we haven't consumed // anything from the first PCollection yet; so our watermark shouldn't advance manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + TimerUpdate.empty(), allConsumers(flattenedBundleSecondCreate), null); TransformWatermarks transformAfterProcessing = manager.getWatermarks(flattened.getProducingTransformInternal()); manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + TimerUpdate.empty(), allConsumers(flattenedBundleSecondCreate), null); assertThat( transformAfterProcessing.getInputWatermark(), @@ -226,7 +230,7 @@ public void getWatermarkForMultiInputTransform() { // the source is done, but elements are still buffered. The source output watermark should be // past the end of the global window manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstPcollectionBundle), + allConsumers(firstPcollectionBundle), new Instant(Long.MAX_VALUE)); TransformWatermarks firstSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -254,7 +258,7 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle completedFlattenBundle = bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(completedFlattenBundle), + TimerUpdate.empty(), allConsumers(completedFlattenBundle), null); TransformWatermarks afterConsumingAllInput = manager.getWatermarks(flattened.getProducingTransformInternal()); @@ -276,7 +280,7 @@ public void getWatermarkForMultiConsumedCollection() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); TransformWatermarks createdAfterProducing = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -288,7 +292,7 @@ public void getWatermarkForMultiConsumedCollection() { TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + TimerUpdate.empty(), allConsumers(keyBundle), null); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -304,7 +308,7 @@ public void getWatermarkForMultiConsumedCollection() { CommittedBundle filteredBundle = timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L))); manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filteredBundle), null); + TimerUpdate.empty(), allConsumers(filteredBundle), null); TransformWatermarks filteredProcessedWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -315,6 +319,41 @@ public void getWatermarkForMultiConsumedCollection() { not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); } + /** + * Demonstrates that producing output for only a single {@link AppliedPTransform} only holds + * the watermark for that transform. + */ + @Test + public void getWatermarkForMultiConsumedCollectionWithOnlyOutputToOnceConsumer() { + CommittedBundle createdBundle = timestampedBundle(createdInts, + TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(3, new Instant(-1000L))); + Map, Collection>> produced = + new HashMap<>(); + produced.put(createdBundle, + ImmutableList.>of(filtered.getProducingTransformInternal())); + manager.updateWatermarks(null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + produced, + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + intsToFlatten.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + TransformWatermarks flattendWms = + manager.getWatermarks(flattened.getProducingTransformInternal()); + // We didn't produce anything to be consumed by the flatten + assertThat(flattendWms.getOutputWatermark(), + not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); + + TransformWatermarks filteredWms = + manager.getWatermarks(filtered.getProducingTransformInternal()); + assertThat(filteredWms.getInputWatermark(), not(laterThan(new Instant(-1000L)))); + } + /** * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided * watermark hold. @@ -325,14 +364,14 @@ public void updateWatermarkWithWatermarkHolds() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + TimerUpdate.empty(), allConsumers(keyBundle), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -359,12 +398,12 @@ public void updateWatermarkWithKeyedWatermarkHolds() { .commit(clock.now()); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(-1000L)); + TimerUpdate.empty(), allConsumers(), new Instant(-1000L)); manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(1234L)); + TimerUpdate.empty(), allConsumers(), new Instant(1234L)); TransformWatermarks filteredWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -376,7 +415,7 @@ public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); @@ -384,11 +423,11 @@ public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(5678L)); + TimerUpdate.empty(), allConsumers(), new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat( filteredWatermarks.getOutputWatermark(), @@ -404,7 +443,7 @@ public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle firstInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstInput), new Instant(0L)); + allConsumers(firstInput), new Instant(0L)); TransformWatermarks firstWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); @@ -412,7 +451,7 @@ public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle secondInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(secondInput), new Instant(-250L)); + allConsumers(secondInput), new Instant(-250L)); TransformWatermarks secondWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L)))); @@ -428,14 +467,14 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + TimerUpdate.empty(), allConsumers(keyBundle), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -463,7 +502,7 @@ public void updateWatermarkWithLateData() { TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), sourceWatermark); + allConsumers(createdBundle), sourceWatermark); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark), @@ -471,7 +510,7 @@ public void updateWatermarkWithLateData() { // Finish processing the on-time data. The watermarks should progress to be equal to the source manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + TimerUpdate.empty(), allConsumers(keyBundle), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark)); @@ -482,7 +521,7 @@ public void updateWatermarkWithLateData() { // the late data arrives in a downstream PCollection after its watermark has advanced past it; // we don't advance the watermark past the current watermark until we've consumed the late data manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(lateDataBundle), new Instant(2_000_000L)); + allConsumers(lateDataBundle), new Instant(2_000_000L)); TransformWatermarks bufferedLateWm = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L))); @@ -497,15 +536,57 @@ public void updateWatermarkWithLateData() { CommittedBundle> lateKeyedBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(lateKeyedBundle), null); + TimerUpdate.empty(), allConsumers(lateKeyedBundle), null); } + @Test + public void updateWatermarkWithInputElementsInOutput() { + Instant sourceWatermark = new Instant(1_000_000L); + CommittedBundle createdBundle = + timestampedBundle( + createdInts, + TimestampedValue.of(1, sourceWatermark), + TimestampedValue.of(2, new Instant(1234L))); + + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(createdBundle), + sourceWatermark); + + CommittedBundle unprocessedBundle = + createdBundle.withElements( + Collections.singleton( + WindowedValue.timestampedValueInGlobalWindow(1, sourceWatermark))); + + // complete the input, but with some elements unprocessed + manager.updateWatermarks( + createdBundle, + keyed.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(unprocessedBundle), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + TransformWatermarks keyedWms = manager.getWatermarks(keyed.getProducingTransformInternal()); + // the input should be held to the still-pending value 1 at the initial WM + assertThat(keyedWms.getInputWatermark(), not(laterThan(sourceWatermark))); + } + + @Test public void updateWatermarkWithDifferentWindowedValueInstances() { manager.updateWatermarks( null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton( + allConsumers( bundleFactory .createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) @@ -519,7 +600,7 @@ public void updateWatermarkWithDifferentWindowedValueInstances() { .commit(Instant.now()), keyed.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), + allConsumers(), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -534,7 +615,7 @@ public void updateWatermarkWithDifferentWindowedValueInstances() { public void getWatermarksAfterOnlyEmptyOutput() { CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + allConsumers(emptyCreateOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -561,11 +642,11 @@ public void getWatermarksAfterOnlyEmptyOutput() { public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle firstCreateOutput = multiWindowedBundle(createdInts, 1, 2); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstCreateOutput), new Instant(12_000L)); + allConsumers(firstCreateOutput), new Instant(12_000L)); CommittedBundle firstFilterOutput = multiWindowedBundle(filtered); manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(firstFilterOutput), + TimerUpdate.empty(), allConsumers(firstFilterOutput), new Instant(10_000L)); TransformWatermarks firstFilterWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -574,7 +655,7 @@ public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + allConsumers(emptyCreateOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -614,7 +695,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now())); @@ -640,7 +721,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { CommittedBundle filterOutputBundle = bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filterOutputBundle), + TimerUpdate.empty(), allConsumers(filterOutputBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks filterAfterConsumed = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -662,7 +743,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { CommittedBundle createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(1248L)); + allConsumers(createdBundle), new Instant(1248L)); TransformWatermarks filteredWms = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -679,7 +760,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { TimerUpdate timers = TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers, - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); Instant startTime = clock.now(); clock.set(startTime.plus(250L)); @@ -716,7 +797,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { TimerUpdate.builder("key") .withCompletedTimers(Collections.singleton(pastTimer)) .build(), - Collections.>singleton(filteredTimerResult), + allConsumers(filteredTimerResult), BoundedWindow.TIMESTAMP_MAX_VALUE); clock.set(startTime.plus(500L)); @@ -727,7 +808,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark()))); manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -762,7 +843,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now()))); @@ -772,7 +853,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { CommittedBundle createSecondOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createSecondOutput), + allConsumers(createSecondOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -782,7 +863,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() { CommittedBundle created = multiWindowedBundle(createdInts, 1, 2, 3); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), new Instant(40_900L)); + allConsumers(created), new Instant(40_900L)); CommittedBundle filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); @@ -790,7 +871,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -810,7 +891,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( TimerUpdate.builder("key") .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)) .build(), - Collections.>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now()))); } @@ -822,7 +903,7 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), + allConsumers(created), new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); @@ -832,7 +913,7 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { created, filtered.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -853,7 +934,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); @@ -873,7 +954,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -887,7 +968,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -910,7 +991,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME); @@ -930,7 +1011,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -945,7 +1026,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { clock.set(new Instant(50_000L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -968,7 +1049,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of( StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); @@ -988,7 +1069,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -1004,7 +1085,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { clock.set(new Instant(50_000L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1165,4 +1246,18 @@ private final CommittedBundle multiWindowedBundle(PCollection pc, T... } return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE); } + + /** + * Create a map from each input bundle to all of the PTransform applications which consume + * that bundle. + */ + private Map, Collection>> allConsumers( + CommittedBundle... bundles) { + ImmutableMap.Builder, Collection>> consumers = + ImmutableMap.builder(); + for (CommittedBundle bundle : bundles) { + consumers.put(bundle, valueToConsumers.get(bundle.getPCollection())); + } + return consumers.build(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index ee56954dbf08..57373b90b5c0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -460,7 +460,7 @@ public void isDoneWithPartiallyDone() { UncommittedBundle rootBundle = context.createRootBundle(created); rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - Iterable> handleResult = + Map, Collection>> handleResult = context.handleResult( null, ImmutableList.of(), @@ -469,7 +469,7 @@ public void isDoneWithPartiallyDone() { .build()); @SuppressWarnings("unchecked") CommittedBundle committedBundle = - (CommittedBundle) Iterables.getOnlyElement(handleResult); + (CommittedBundle) Iterables.getOnlyElement(handleResult.keySet()); context.handleResult( null, ImmutableList.of(), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java index d3d70e0e3f5d..55454ce6f294 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java @@ -487,11 +487,11 @@ private RegisteringCompletionCallback(CountDownLatch onMethod) { } @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return Collections.emptyList(); + return Collections.emptyMap(); } @Override From 860f52c2016a1b73efb92b48f8ab65002069f302 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 27 Apr 2016 14:49:53 -0700 Subject: [PATCH 2/3] fixup! Return a map of CommittedBundle to Consumers from handleResult --- .../inprocess/ExecutorServiceParallelExecutor.java | 9 ++++++--- .../inprocess/InMemoryWatermarkManager.java | 14 +++++++++++++- .../inprocess/InProcessEvaluationContext.java | 1 - 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index bd5ddf6f4cbf..726bf15de412 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -272,8 +272,11 @@ public void handleThrowable(CommittedBundle inputBundle, Throwable t) { * Used to signal when the executor should be shut down (due to an exception). */ private static class ExecutorUpdate { + /** The bundle to be consumed. If present, consumer must also be present. */ private final Optional> bundle; + /** The consumer of the bundle. If present, bundle must also be present. */ private final Optional> consumer; + private final Optional throwable; public static ExecutorUpdate fromBundle( @@ -286,9 +289,9 @@ public static ExecutorUpdate fromThrowable(Throwable t) { } private ExecutorUpdate( - CommittedBundle producedBundle, - AppliedPTransform consumer, - Throwable throwable) { + @Nullable CommittedBundle producedBundle, + @Nullable AppliedPTransform consumer, + @Nullable Throwable throwable) { checkArgument((producedBundle == null) == (consumer == null), "The produced bundle and consuming PTransform must either " + "both be null or neither be null"); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java index 07d72a424b6c..f11e8ab49ea9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -800,7 +800,11 @@ public TransformWatermarks getWatermarks(AppliedPTransform transform) { * * @param completed the input that has completed * @param transform the transform that has completed processing the input - * @param outputs the bundles the transform has output + * @param timerUpdate the timers that fired to produce this update, plus the timers that were + * added or removed as part of processing this update + * @param outputs the CommittedBundles that were output by processing the input bundle, and the + * PTransforms that the bundles will be consumed by. Elements in each output bundle + * become pending on each AppliedPTransform that will consume them * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there * is no hold */ @@ -836,6 +840,14 @@ private void refreshWatermarks(AppliedPTransform transform) { * and removes all deleted timers. Removes all elements consumed by the input bundle from the * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced * by the {@link PTransform} to the pending queue of each consumer. + * + * @param input the CommittedBundle that produced this update + * @param transform the AppliedPTransform that consumed the input to produce the outputs + * @param timerUpdate the timers that fired to produce this update, plus the timers that were + * added or removed as part of processing this update + * @param outputs the CommittedBundles that were output by processing the input bundle, and the + * PTransforms that the bundles will be consumed by. Elements in each output bundle + * become pending on each AppliedPTransform that will consume them */ private void updatePending( CommittedBundle input, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 0da61599a910..4957cb193573 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; From 0a16cfdfe20bd862c7ba6031e7359834cebd603d Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 28 Apr 2016 09:18:52 -0700 Subject: [PATCH 3/3] fixup! Return a map of CommittedBundle to Consumers from handleResult --- .../beam/sdk/runners/inprocess/InProcessEvaluationContext.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 4957cb193573..8fdfb1121365 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -148,7 +148,8 @@ private InProcessEvaluationContext( * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, * or an empty iterable if no timers were delivered * @param result the result of evaluating the input bundle - * @return the committed bundles contained within the handled {@code result} + * @return a mapping between the Committed {@link UncommittedBundle bundles} contained within the + * result to each {@link AppliedPTransform} that will consume them */ public synchronized Map, Collection>> handleResult( @Nullable CommittedBundle completedBundle,