diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java new file mode 100644 index 000000000000..3ad0ae66b133 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied + * + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import com.google.auto.value.AutoValue; + +/** + * A {@link InProcessTransformResult} that has been committed. + */ +@AutoValue +abstract class CommittedResult { + /** + * Returns the {@link AppliedPTransform} that produced this result. + */ + public abstract AppliedPTransform getTransform(); + + /** + * Returns the outputs produced by the transform. + */ + public abstract Iterable> getOutputs(); + + public static CommittedResult create( + InProcessTransformResult original, Iterable> outputs) { + return new AutoValue_CommittedResult(original.getTransform(), + outputs); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java index 90c488e6a40e..30a2b92cd10d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java @@ -26,7 +26,7 @@ interface CompletionCallback { /** * Handle a successful result, returning the committed outputs of the result. */ - Iterable> handleResult( + CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result); /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 3463d081d886..19bf35de2aa8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -216,14 +216,14 @@ public void awaitCompletion() throws Throwable { */ private class DefaultCompletionCallback implements CompletionCallback { @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, Collections.emptyList(), result); - for (CommittedBundle outputBundle : resultBundles) { + for (CommittedBundle outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override @@ -246,14 +246,14 @@ private TimerCompletionCallback(Iterable timers) { } @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { - Iterable> resultBundles = + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle outputBundle : resultBundles) { + for (CommittedBundle outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } - return resultBundles; + return committedResult; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 3990f0d04fdb..7c0dceeab631 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -145,7 +145,7 @@ private InProcessEvaluationContext( * @param result the result of evaluating the input bundle * @return the committed bundles contained within the handled {@code result} */ - public synchronized Iterable> handleResult( + public synchronized CommittedResult handleResult( @Nullable CommittedBundle completedBundle, Iterable completedTimers, InProcessTransformResult result) { @@ -176,7 +176,7 @@ public synchronized Iterable> handleResult( applicationStateInternals.remove(stepAndKey); } } - return committedBundles; + return CommittedResult.create(result, committedBundles); } private Iterable> commitBundles( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java index 3a7bedc8d280..a93c7b2098f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java @@ -158,9 +158,9 @@ private InProcessTransformResult finishBundle( TransformEvaluator evaluator, Collection> enforcements) throws Exception { InProcessTransformResult result = evaluator.finishBundle(); - Iterable> outputs = onComplete.handleResult(inputBundle, result); + CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs); + enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); } return result; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java new file mode 100644 index 000000000000..7fad647ca633 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied + * + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * Tests for {@link CommittedResult}. + */ +@RunWith(JUnit4.class) +public class CommittedResultTest implements Serializable { + private transient TestPipeline p = TestPipeline.create(); + private transient AppliedPTransform transform = + AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform() { + }); + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void getTransformExtractsFromResult() { + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), + Collections.>emptyList()); + + assertThat(result.getTransform(), Matchers.>equalTo(transform)); + } + + @Test + public void getOutputsEqualInput() { + List> outputs = + ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED)).commit(Instant.now()), + bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED)).commit(Instant.now())); + CommittedResult result = + CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs); + + assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray())); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java index ee56954dbf08..d1ea51a5b8f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -460,7 +460,7 @@ public void isDoneWithPartiallyDone() { UncommittedBundle rootBundle = context.createRootBundle(created); rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - Iterable> handleResult = + CommittedResult handleResult = context.handleResult( null, ImmutableList.of(), @@ -469,7 +469,7 @@ public void isDoneWithPartiallyDone() { .build()); @SuppressWarnings("unchecked") CommittedBundle committedBundle = - (CommittedBundle) Iterables.getOnlyElement(handleResult); + (CommittedBundle) Iterables.getOnlyElement(handleResult.getOutputs()); context.handleResult( null, ImmutableList.of(), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java index d3d70e0e3f5d..31cb29a5dbfc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java @@ -487,11 +487,11 @@ private RegisteringCompletionCallback(CountDownLatch onMethod) { } @Override - public Iterable> handleResult( + public CommittedResult handleResult( CommittedBundle inputBundle, InProcessTransformResult result) { handledResult = result; onMethod.countDown(); - return Collections.emptyList(); + return CommittedResult.create(result, Collections.>emptyList()); } @Override