diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java index 90c488e6a40e..e825382225f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java @@ -18,15 +18,20 @@ package org.apache.beam.sdk.runners.inprocess; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import java.util.Collection; +import java.util.Map; /** * A callback for completing a bundle of input. */ interface CompletionCallback { /** - * Handle a successful result, returning the committed outputs of the result. + * Handle a successful result, returning the committed outputs of the result and the transforms + * that should consume those outputs. */ - Iterable> handleResult( + Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result); /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 3463d081d886..726bf15de412 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.runners.inprocess; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; @@ -64,7 +66,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { private final ExecutorService executorService; - private final Map>> valueToConsumers; private final Set keyedPValues; private final TransformEvaluatorRegistry registry; @SuppressWarnings("rawtypes") @@ -86,26 +87,23 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { public static ExecutorServiceParallelExecutor create( ExecutorService executorService, - Map>> valueToConsumers, Set keyedPValues, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") Map, Collection> transformEnforcements, InProcessEvaluationContext context) { return new ExecutorServiceParallelExecutor( - executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context); + executorService, keyedPValues, registry, transformEnforcements, context); } private ExecutorServiceParallelExecutor( ExecutorService executorService, - Map>> valueToConsumers, Set keyedPValues, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") Map, Collection> transformEnforcements, InProcessEvaluationContext context) { this.executorService = executorService; - this.valueToConsumers = valueToConsumers; this.keyedPValues = keyedPValues; this.registry = registry; this.transformEnforcements = transformEnforcements; @@ -191,10 +189,8 @@ private boolean isKeyed(PValue pvalue) { return keyedPValues.contains(pvalue); } - private void scheduleConsumers(CommittedBundle bundle) { - for (AppliedPTransform consumer : valueToConsumers.get(bundle.getPCollection())) { - scheduleConsumption(consumer, bundle, defaultCompletionCallback); - } + private void scheduleConsumers(CommittedBundle bundle, AppliedPTransform consumer) { + scheduleConsumption(consumer, bundle, defaultCompletionCallback); } @Override @@ -216,12 +212,16 @@ public void awaitCompletion() throws Throwable { */ private class DefaultCompletionCallback implements CompletionCallback { @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + Map, Collection>> resultBundles = evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); - for (CommittedBundle outputBundle : resultBundles) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + for (Map.Entry, Collection>> output + : resultBundles.entrySet()) { + CommittedBundle bundle = output.getKey(); + for (AppliedPTransform consumer : output.getValue()) { + allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer)); + } } return resultBundles; } @@ -246,12 +246,16 @@ private TimerCompletionCallback(Iterable timers) { } @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + Map, Collection>> resultBundles = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle outputBundle : resultBundles) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + for (Map.Entry, Collection>> output + : resultBundles.entrySet()) { + CommittedBundle bundle = output.getKey(); + for (AppliedPTransform consumer : output.getValue()) { + allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer)); + } } return resultBundles; } @@ -268,19 +272,31 @@ public void handleThrowable(CommittedBundle inputBundle, Throwable t) { * Used to signal when the executor should be shut down (due to an exception). */ private static class ExecutorUpdate { + /** The bundle to be consumed. If present, consumer must also be present. */ private final Optional> bundle; + /** The consumer of the bundle. If present, bundle must also be present. */ + private final Optional> consumer; + private final Optional throwable; - public static ExecutorUpdate fromBundle(CommittedBundle bundle) { - return new ExecutorUpdate(bundle, null); + public static ExecutorUpdate fromBundle( + CommittedBundle bundle, AppliedPTransform consumer) { + return new ExecutorUpdate(bundle, consumer, null); } public static ExecutorUpdate fromThrowable(Throwable t) { - return new ExecutorUpdate(null, t); + return new ExecutorUpdate(null, null, t); } - private ExecutorUpdate(CommittedBundle producedBundle, Throwable throwable) { + private ExecutorUpdate( + @Nullable CommittedBundle producedBundle, + @Nullable AppliedPTransform consumer, + @Nullable Throwable throwable) { + checkArgument((producedBundle == null) == (consumer == null), + "The produced bundle and consuming PTransform must either " + + "both be null or neither be null"); this.bundle = Optional.fromNullable(producedBundle); + this.consumer = Optional.fromNullable(consumer); this.throwable = Optional.fromNullable(throwable); } @@ -288,6 +304,10 @@ public Optional> getBundle() { return bundle; } + public Optional> getConsumer() { + return consumer; + } + public Optional getException() { return throwable; } @@ -344,7 +364,7 @@ public void run() { while (update != null) { LOG.debug("Executor Update: {}", update); if (update.getBundle().isPresent()) { - scheduleConsumers(update.getBundle().get()); + scheduleConsumers(update.getBundle().get(), update.getConsumer().get()); } else if (update.getException().isPresent()) { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java index d941cc7ee4c9..f11e8ab49ea9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -800,7 +800,11 @@ public TransformWatermarks getWatermarks(AppliedPTransform transform) { * * @param completed the input that has completed * @param transform the transform that has completed processing the input - * @param outputs the bundles the transform has output + * @param timerUpdate the timers that fired to produce this update, plus the timers that were + * added or removed as part of processing this update + * @param outputs the CommittedBundles that were output by processing the input bundle, and the + * PTransforms that the bundles will be consumed by. Elements in each output bundle + * become pending on each AppliedPTransform that will consume them * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there * is no hold */ @@ -808,7 +812,7 @@ public void updateWatermarks( @Nullable CommittedBundle completed, AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs, + Map, Collection>> outputs, @Nullable Instant earliestHold) { updatePending(completed, transform, timerUpdate, outputs); TransformWatermarks transformWms = transformToWatermarks.get(transform); @@ -836,20 +840,30 @@ private void refreshWatermarks(AppliedPTransform transform) { * and removes all deleted timers. Removes all elements consumed by the input bundle from the * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced * by the {@link PTransform} to the pending queue of each consumer. + * + * @param input the CommittedBundle that produced this update + * @param transform the AppliedPTransform that consumed the input to produce the outputs + * @param timerUpdate the timers that fired to produce this update, plus the timers that were + * added or removed as part of processing this update + * @param outputs the CommittedBundles that were output by processing the input bundle, and the + * PTransforms that the bundles will be consumed by. Elements in each output bundle + * become pending on each AppliedPTransform that will consume them */ private void updatePending( CommittedBundle input, AppliedPTransform transform, TimerUpdate timerUpdate, - Iterable> outputs) { + Map, Collection>> outputs) { TransformWatermarks completedTransform = transformToWatermarks.get(transform); completedTransform.updateTimers(timerUpdate); if (input != null) { completedTransform.removePending(input); } - for (CommittedBundle bundle : outputs) { - for (AppliedPTransform consumer : consumers.get(bundle.getPCollection())) { + for (Map.Entry, Collection>> + outputEntry : outputs.entrySet()) { + CommittedBundle bundle = outputEntry.getKey(); + for (AppliedPTransform consumer : outputEntry.getValue()) { TransformWatermarks watermarks = transformToWatermarks.get(consumer); watermarks.addPending(bundle); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 3990f0d04fdb..8fdfb1121365 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -42,7 +42,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.util.Collection; @@ -74,6 +74,11 @@ class InProcessEvaluationContext { /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ private final Map, String> stepNames; + /** + * The mapping from each {@link PValue} contained within the {@link Pipeline} to each + * {@link AppliedPTransform} that consumes it. + */ + private final Map>> valueToConsumers; /** The options that were used to create this {@link Pipeline}. */ private final InProcessPipelineOptions options; @@ -114,7 +119,7 @@ private InProcessEvaluationContext( this.options = checkNotNull(options); this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); - checkNotNull(valueToConsumers); + this.valueToConsumers = checkNotNull(valueToConsumers); checkNotNull(stepNames); checkNotNull(views); this.stepNames = stepNames; @@ -143,13 +148,14 @@ private InProcessEvaluationContext( * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, * or an empty iterable if no timers were delivered * @param result the result of evaluating the input bundle - * @return the committed bundles contained within the handled {@code result} + * @return a mapping between the Committed {@link UncommittedBundle bundles} contained within the + * result to each {@link AppliedPTransform} that will consume them */ - public synchronized Iterable> handleResult( + public synchronized Map, Collection>> handleResult( @Nullable CommittedBundle completedBundle, Iterable completedTimers, InProcessTransformResult result) { - Iterable> committedBundles = + Map, Collection>> committedBundles = commitBundles(result.getOutputBundles()); // Update watermarks and timers watermarkManager.updateWatermarks( @@ -179,10 +185,11 @@ public synchronized Iterable> handleResult( return committedBundles; } - private Iterable> commitBundles( - Iterable> bundles) { - ImmutableList.Builder> completed = ImmutableList.builder(); - for (UncommittedBundle inProgress : bundles) { + private Map, Collection>> commitBundles( + Iterable> outputBundles) { + ImmutableMap.Builder, Collection>> outputs + = ImmutableMap.builder(); + for (UncommittedBundle inProgress : outputBundles) { AppliedPTransform producing = inProgress.getPCollection().getProducingTransformInternal(); TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); @@ -191,10 +198,10 @@ private Iterable> commitBundles( // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so // filter them out if (!Iterables.isEmpty(committed.getElements())) { - completed.add(committed); + outputs.put(committed, valueToConsumers.get(committed.getPCollection())); } } - return completed.build(); + return outputs.build(); } private void fireAllAvailableCallbacks() { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index 7897f2e31caf..539675d0c6a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -239,7 +239,6 @@ public InProcessPipelineResult run(Pipeline pipeline) { InProcessExecutor executor = ExecutorServiceParallelExecutor.create( executorService, - consumerTrackingVisitor.getValueToConsumers(), keyedPValueVisitor.getKeyedPValues(), TransformEvaluatorRegistry.defaultRegistry(), defaultModelEnforcements(options), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java index 3a7bedc8d280..22aae708f31d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; @@ -158,9 +159,10 @@ private InProcessTransformResult finishBundle( TransformEvaluator evaluator, Collection> enforcements) throws Exception { InProcessTransformResult result = evaluator.finishBundle(); - Iterable> outputs = onComplete.handleResult(inputBundle, result); + Map, Collection>> outputs = + onComplete.handleResult(inputBundle, result); for (ModelEnforcement enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs); + enforcement.afterFinish(inputBundle, result, outputs.keySet()); } return result; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 3c4503f9d5eb..cd749f33739d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -90,6 +91,8 @@ public class InMemoryWatermarkManagerTest implements Serializable { private transient InMemoryWatermarkManager manager; private transient BundleFactory bundleFactory; + private transient Map>> valueToConsumers; + @Before public void setup() { TestPipeline p = TestPipeline.create(); @@ -133,6 +136,7 @@ public void processElement(DoFn.ProcessContext c) throws Excep flattened.getProducingTransformInternal())); consumers.put(flattened, Collections.>emptyList()); + valueToConsumers = consumers; clock = MockClock.fromInstant(new Instant(1000)); manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers); @@ -160,7 +164,7 @@ public void getWatermarkForUntouchedTransform() { public void getWatermarkForUpdatedSourceTransform() { CommittedBundle output = multiWindowedBundle(createdInts, 1); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(output), new Instant(8000L)); + allConsumers(output), new Instant(8000L)); TransformWatermarks updatedSourceWatermark = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -176,7 +180,7 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle secondPcollectionBundle = multiWindowedBundle(intsToFlatten, -1); manager.updateWatermarks(null, intsToFlatten.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(secondPcollectionBundle), + TimerUpdate.empty(), allConsumers(secondPcollectionBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); // We didn't do anything for the first source, so we shouldn't have progressed the watermark @@ -206,12 +210,12 @@ public void getWatermarkForMultiInputTransform() { // We have finished processing the bundle from the second PCollection, but we haven't consumed // anything from the first PCollection yet; so our watermark shouldn't advance manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + TimerUpdate.empty(), allConsumers(flattenedBundleSecondCreate), null); TransformWatermarks transformAfterProcessing = manager.getWatermarks(flattened.getProducingTransformInternal()); manager.updateWatermarks(secondPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(flattenedBundleSecondCreate), + TimerUpdate.empty(), allConsumers(flattenedBundleSecondCreate), null); assertThat( transformAfterProcessing.getInputWatermark(), @@ -226,7 +230,7 @@ public void getWatermarkForMultiInputTransform() { // the source is done, but elements are still buffered. The source output watermark should be // past the end of the global window manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstPcollectionBundle), + allConsumers(firstPcollectionBundle), new Instant(Long.MAX_VALUE)); TransformWatermarks firstSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -254,7 +258,7 @@ public void getWatermarkForMultiInputTransform() { CommittedBundle completedFlattenBundle = bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(completedFlattenBundle), + TimerUpdate.empty(), allConsumers(completedFlattenBundle), null); TransformWatermarks afterConsumingAllInput = manager.getWatermarks(flattened.getProducingTransformInternal()); @@ -276,7 +280,7 @@ public void getWatermarkForMultiConsumedCollection() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); TransformWatermarks createdAfterProducing = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat( @@ -288,7 +292,7 @@ public void getWatermarkForMultiConsumedCollection() { TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + TimerUpdate.empty(), allConsumers(keyBundle), null); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat( @@ -304,7 +308,7 @@ public void getWatermarkForMultiConsumedCollection() { CommittedBundle filteredBundle = timestampedBundle(filtered, TimestampedValue.of(2, new Instant(1234L))); manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filteredBundle), null); + TimerUpdate.empty(), allConsumers(filteredBundle), null); TransformWatermarks filteredProcessedWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); assertThat( @@ -315,6 +319,41 @@ public void getWatermarkForMultiConsumedCollection() { not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); } + /** + * Demonstrates that producing output for only a single {@link AppliedPTransform} only holds + * the watermark for that transform. + */ + @Test + public void getWatermarkForMultiConsumedCollectionWithOnlyOutputToOnceConsumer() { + CommittedBundle createdBundle = timestampedBundle(createdInts, + TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), + TimestampedValue.of(3, new Instant(-1000L))); + Map, Collection>> produced = + new HashMap<>(); + produced.put(createdBundle, + ImmutableList.>of(filtered.getProducingTransformInternal())); + manager.updateWatermarks(null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + produced, + BoundedWindow.TIMESTAMP_MAX_VALUE); + manager.updateWatermarks(null, + intsToFlatten.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + TransformWatermarks flattendWms = + manager.getWatermarks(flattened.getProducingTransformInternal()); + // We didn't produce anything to be consumed by the flatten + assertThat(flattendWms.getOutputWatermark(), + not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE))); + + TransformWatermarks filteredWms = + manager.getWatermarks(filtered.getProducingTransformInternal()); + assertThat(filteredWms.getInputWatermark(), not(laterThan(new Instant(-1000L)))); + } + /** * Demonstrates that the watermark of an {@link AppliedPTransform} is held to the provided * watermark hold. @@ -325,14 +364,14 @@ public void updateWatermarkWithWatermarkHolds() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + TimerUpdate.empty(), allConsumers(keyBundle), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -359,12 +398,12 @@ public void updateWatermarkWithKeyedWatermarkHolds() { .commit(clock.now()); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - ImmutableList.of(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(firstKeyBundle, secondKeyBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(-1000L)); + TimerUpdate.empty(), allConsumers(), new Instant(-1000L)); manager.updateWatermarks(secondKeyBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(1234L)); + TimerUpdate.empty(), allConsumers(), new Instant(1234L)); TransformWatermarks filteredWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -376,7 +415,7 @@ public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); @@ -384,11 +423,11 @@ public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), new Instant(5678L)); + TimerUpdate.empty(), allConsumers(), new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat( filteredWatermarks.getOutputWatermark(), @@ -404,7 +443,7 @@ public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle firstInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstInput), new Instant(0L)); + allConsumers(firstInput), new Instant(0L)); TransformWatermarks firstWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); @@ -412,7 +451,7 @@ public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle secondInput = bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(secondInput), new Instant(-250L)); + allConsumers(secondInput), new Instant(-250L)); TransformWatermarks secondWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(secondWatermarks.getOutputWatermark(), not(earlierThan(new Instant(0L)))); @@ -428,14 +467,14 @@ public void updateWatermarkWithHoldsShouldBeMonotonic() { TimestampedValue.of(1, new Instant(1_000_000L)), TimestampedValue.of(2, new Instant(1234L)), TimestampedValue.of(3, new Instant(-1000L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(Long.MAX_VALUE)); + allConsumers(createdBundle), new Instant(Long.MAX_VALUE)); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), new Instant(1_000_000L)), TimestampedValue.of(KV.of("MyKey", 2), new Instant(1234L)), TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), + TimerUpdate.empty(), allConsumers(keyBundle), new Instant(500L)); TransformWatermarks keyedWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -463,7 +502,7 @@ public void updateWatermarkWithLateData() { TimestampedValue.of(1, sourceWatermark), TimestampedValue.of(2, new Instant(1234L))); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), sourceWatermark); + allConsumers(createdBundle), sourceWatermark); CommittedBundle> keyBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 1), sourceWatermark), @@ -471,7 +510,7 @@ public void updateWatermarkWithLateData() { // Finish processing the on-time data. The watermarks should progress to be equal to the source manager.updateWatermarks(createdBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(keyBundle), null); + TimerUpdate.empty(), allConsumers(keyBundle), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); assertThat(onTimeWatermarks.getInputWatermark(), equalTo(sourceWatermark)); @@ -482,7 +521,7 @@ public void updateWatermarkWithLateData() { // the late data arrives in a downstream PCollection after its watermark has advanced past it; // we don't advance the watermark past the current watermark until we've consumed the late data manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(lateDataBundle), new Instant(2_000_000L)); + allConsumers(lateDataBundle), new Instant(2_000_000L)); TransformWatermarks bufferedLateWm = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(bufferedLateWm.getOutputWatermark(), equalTo(new Instant(2_000_000L))); @@ -497,15 +536,57 @@ public void updateWatermarkWithLateData() { CommittedBundle> lateKeyedBundle = timestampedBundle(keyed, TimestampedValue.of(KV.of("MyKey", 3), new Instant(-1000L))); manager.updateWatermarks(lateDataBundle, keyed.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(lateKeyedBundle), null); + TimerUpdate.empty(), allConsumers(lateKeyedBundle), null); } + @Test + public void updateWatermarkWithInputElementsInOutput() { + Instant sourceWatermark = new Instant(1_000_000L); + CommittedBundle createdBundle = + timestampedBundle( + createdInts, + TimestampedValue.of(1, sourceWatermark), + TimestampedValue.of(2, new Instant(1234L))); + + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(createdBundle), + sourceWatermark); + + CommittedBundle unprocessedBundle = + createdBundle.withElements( + Collections.singleton( + WindowedValue.timestampedValueInGlobalWindow(1, sourceWatermark))); + + // complete the input, but with some elements unprocessed + manager.updateWatermarks( + createdBundle, + keyed.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(unprocessedBundle), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + allConsumers(), + BoundedWindow.TIMESTAMP_MAX_VALUE); + + TransformWatermarks keyedWms = manager.getWatermarks(keyed.getProducingTransformInternal()); + // the input should be held to the still-pending value 1 at the initial WM + assertThat(keyedWms.getInputWatermark(), not(laterThan(sourceWatermark))); + } + + @Test public void updateWatermarkWithDifferentWindowedValueInstances() { manager.updateWatermarks( null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton( + allConsumers( bundleFactory .createRootBundle(createdInts) .add(WindowedValue.valueInGlobalWindow(1)) @@ -519,7 +600,7 @@ public void updateWatermarkWithDifferentWindowedValueInstances() { .commit(Instant.now()), keyed.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), + allConsumers(), null); TransformWatermarks onTimeWatermarks = manager.getWatermarks(keyed.getProducingTransformInternal()); @@ -534,7 +615,7 @@ public void updateWatermarkWithDifferentWindowedValueInstances() { public void getWatermarksAfterOnlyEmptyOutput() { CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + allConsumers(emptyCreateOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -561,11 +642,11 @@ public void getWatermarksAfterOnlyEmptyOutput() { public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle firstCreateOutput = multiWindowedBundle(createdInts, 1, 2); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(firstCreateOutput), new Instant(12_000L)); + allConsumers(firstCreateOutput), new Instant(12_000L)); CommittedBundle firstFilterOutput = multiWindowedBundle(filtered); manager.updateWatermarks(firstCreateOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(firstFilterOutput), + TimerUpdate.empty(), allConsumers(firstFilterOutput), new Instant(10_000L)); TransformWatermarks firstFilterWatermarks = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -574,7 +655,7 @@ public void getWatermarksAfterHoldAndEmptyOutput() { CommittedBundle emptyCreateOutput = multiWindowedBundle(createdInts); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(emptyCreateOutput), + allConsumers(emptyCreateOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks updatedSourceWatermarks = manager.getWatermarks(createdInts.getProducingTransformInternal()); @@ -614,7 +695,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), equalTo(clock.now())); @@ -640,7 +721,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { CommittedBundle filterOutputBundle = bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>singleton(filterOutputBundle), + TimerUpdate.empty(), allConsumers(filterOutputBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks filterAfterConsumed = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -662,7 +743,7 @@ public void getSynchronizedProcessingTimeInputWatermarksHeldToPendingBundles() { public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { CommittedBundle createdBundle = multiWindowedBundle(createdInts, 1, 2, 4, 8); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createdBundle), new Instant(1248L)); + allConsumers(createdBundle), new Instant(1248L)); TransformWatermarks filteredWms = manager.getWatermarks(filtered.getProducingTransformInternal()); @@ -679,7 +760,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { TimerUpdate timers = TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks(createdBundle, filtered.getProducingTransformInternal(), timers, - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); Instant startTime = clock.now(); clock.set(startTime.plus(250L)); @@ -716,7 +797,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { TimerUpdate.builder("key") .withCompletedTimers(Collections.singleton(pastTimer)) .build(), - Collections.>singleton(filteredTimerResult), + allConsumers(filteredTimerResult), BoundedWindow.TIMESTAMP_MAX_VALUE); clock.set(startTime.plus(500L)); @@ -727,7 +808,7 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { not(earlierThan(filteredTimerResult.getSynchronizedProcessingOutputWatermark()))); manager.updateWatermarks(filteredTimerResult, filteredTimesTwo.getProducingTransformInternal(), - TimerUpdate.empty(), Collections.>emptyList(), + TimerUpdate.empty(), allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -762,7 +843,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks createAfterUpdate = manager.getWatermarks(createdInts.getProducingTransformInternal()); assertThat(createAfterUpdate.getSynchronizedProcessingInputTime(), not(laterThan(clock.now()))); @@ -772,7 +853,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { CommittedBundle createSecondOutput = bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(createSecondOutput), + allConsumers(createSecondOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(createAfterUpdate.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); @@ -782,7 +863,7 @@ public void getSynchronizedProcessingTimeOutputTimeIsMonotonic() { public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers() { CommittedBundle created = multiWindowedBundle(createdInts, 1, 2, 3); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), new Instant(40_900L)); + allConsumers(created), new Instant(40_900L)); CommittedBundle filteredBundle = multiWindowedBundle(filtered, 2, 4); Instant upstreamHold = new Instant(2048L); @@ -790,7 +871,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME); manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(), - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -810,7 +891,7 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( TimerUpdate.builder("key") .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)) .build(), - Collections.>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); + allConsumers(), BoundedWindow.TIMESTAMP_MAX_VALUE); assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(earlierThan(clock.now()))); } @@ -822,7 +903,7 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(created), + allConsumers(created), new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); @@ -832,7 +913,7 @@ public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { created, filtered.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>singleton(filteredBundle), + allConsumers(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); TransformWatermarks downstreamWms = @@ -853,7 +934,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME); @@ -873,7 +954,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -887,7 +968,7 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -910,7 +991,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME); @@ -930,7 +1011,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -945,7 +1026,7 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { clock.set(new Instant(50_000L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -968,7 +1049,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle createdBundle = multiWindowedBundle(filtered); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.singleton(createdBundle), new Instant(1500L)); + allConsumers(createdBundle), new Instant(1500L)); TimerData earliestTimer = TimerData.of( StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); @@ -988,7 +1069,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { createdBundle, filtered.getProducingTransformInternal(), update, - Collections.>singleton(multiWindowedBundle(intsToFlatten)), + allConsumers(multiWindowedBundle(intsToFlatten)), new Instant(1000L)); Map, Map> firstTransformFiredTimers = @@ -1004,7 +1085,7 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { clock.set(new Instant(50_000L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.>emptyList(), new Instant(50_000L)); + allConsumers(), new Instant(50_000L)); Map, Map> secondTransformFiredTimers = manager.extractFiredTimers(); assertThat( @@ -1165,4 +1246,18 @@ private final CommittedBundle multiWindowedBundle(PCollection pc, T... } return bundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE); } + + /** + * Create a map from each input bundle to all of the PTransform applications which consume + * that bundle. + */ + private Map, Collection>> allConsumers( + CommittedBundle... bundles) { + ImmutableMap.Builder, Collection>> consumers = + ImmutableMap.builder(); + for (CommittedBundle bundle : bundles) { + consumers.put(bundle, valueToConsumers.get(bundle.getPCollection())); + } + return consumers.build(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index ee56954dbf08..57373b90b5c0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -460,7 +460,7 @@ public void isDoneWithPartiallyDone() { UncommittedBundle rootBundle = context.createRootBundle(created); rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - Iterable> handleResult = + Map, Collection>> handleResult = context.handleResult( null, ImmutableList.of(), @@ -469,7 +469,7 @@ public void isDoneWithPartiallyDone() { .build()); @SuppressWarnings("unchecked") CommittedBundle committedBundle = - (CommittedBundle) Iterables.getOnlyElement(handleResult); + (CommittedBundle) Iterables.getOnlyElement(handleResult.keySet()); context.handleResult( null, ImmutableList.of(), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java index d3d70e0e3f5d..55454ce6f294 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java @@ -487,11 +487,11 @@ private RegisteringCompletionCallback(CountDownLatch onMethod) { } @Override - public Iterable> handleResult( + public Map, Collection>> handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return Collections.emptyList(); + return Collections.emptyMap(); } @Override