From 8629b00a1670e1de4b96ba06fab7bb5347c81d9c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 16:12:21 -0700 Subject: [PATCH] Encapsulate cloning behavior of in-process ParDo evaluator This will make way for using the evluator in contexts where cloning is not appropriate, such as evaluator GroupAlsoByWindow --- .../runners/inprocess/CloningSupplier.java | 42 +++++++++++++++++++ .../inprocess/ParDoInProcessEvaluator.java | 7 ++-- .../inprocess/ParDoMultiEvaluatorFactory.java | 2 +- .../ParDoSingleEvaluatorFactory.java | 12 ++++-- 4 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java new file mode 100644 index 000000000000..1ddb48a014ee --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.util.SerializableUtils; + +import com.google.common.base.Supplier; + +import java.io.Serializable; + +class CloningSupplier implements Supplier { + private final T value; + + public static CloningSupplier forValue(ValueT value) { + return new CloningSupplier<>(value); + } + + private CloningSupplier(T value) { + this.value = value; + } + + @Override + public T get() { + return SerializableUtils.clone(value); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 35639bdcac5b..8d1849953fd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; @@ -34,6 +33,8 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.base.Supplier; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -45,7 +46,7 @@ public static ParDoInProcessEvaluator create( InProcessEvaluationContext evaluationContext, CommittedBundle inputBundle, AppliedPTransform, ?, ?> application, - DoFn fn, + Supplier> fnSupplier, List> sideInputs, TupleTag mainOutputTag, List> sideOutputTags, @@ -68,7 +69,7 @@ public static ParDoInProcessEvaluator create( DoFnRunner runner = DoFnRunners.createDefault( evaluationContext.getPipelineOptions(), - SerializableUtils.clone(fn), + fnSupplier.get(), evaluationContext.createSideInputReader(sideInputs), BundleOutputManager.create(outputBundles), mainOutputTag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index 299d3a89125b..1a4ab1af598a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -54,7 +54,7 @@ private static ParDoInProcessEvaluator createMultiEvaluator( evaluationContext, inputBundle, application, - fn, + CloningSupplier.forValue(fn), application.getTransform().getSideInputs(), application.getTransform().getMainOutputTag(), application.getTransform().getSideOutputTags().getAll(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index 4d38448392ec..c594fb1b6b5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -33,6 +33,8 @@ * {@link Bound ParDo.Bound} primitive {@link PTransform}. */ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { + + @Override public TransformEvaluator forApplication( final AppliedPTransform application, @@ -45,16 +47,18 @@ public TransformEvaluator forApplication( } private static ParDoInProcessEvaluator createSingleEvaluator( - @SuppressWarnings("rawtypes") AppliedPTransform, PCollection, - Bound> application, - CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { + @SuppressWarnings("rawtypes") + final AppliedPTransform, PCollection, Bound> + application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { TupleTag mainOutputTag = new TupleTag<>("out"); return ParDoInProcessEvaluator.create( evaluationContext, inputBundle, application, - application.getTransform().getFn(), + CloningSupplier.forValue(application.getTransform().getFn()), application.getTransform().getSideInputs(), mainOutputTag, Collections.>emptyList(),