From 6f3eeb4fada9fa72763980f26af8949141dbbe51 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 15:50:32 -0700 Subject: [PATCH 1/4] Add WindowMatchers.isWindowedValue() --- .../src/test/java/org/apache/beam/sdk/WindowMatchers.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java index b47c32c4940d..104ef061c78d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java @@ -42,6 +42,11 @@ public static Matcher> isWindowedValue( return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, windowsMatcher); } + public static Matcher> isWindowedValue( + Matcher valueMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + } + public static Matcher> isWindowedValue( Matcher valueMatcher, Matcher timestampMatcher) { return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); From 34ec15d6a5923287f4db0db63083c37b87c030b7 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 15:51:40 -0700 Subject: [PATCH 2/4] Add accessors for sub-coders of KeyedWorkItemCoder --- .../java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java index 763f68b302e3..ec5d82138269 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java @@ -79,6 +79,14 @@ private KeyedWorkItemCoder( this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder)); } + public Coder getKeyCoder() { + return keyCoder; + } + + public Coder getElementCoder() { + return elemCoder; + } + @Override public void encode(KeyedWorkItem value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { From 91643088f4032898cf67973b032d86a528eca199 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 16:12:21 -0700 Subject: [PATCH 3/4] Encapsulate cloning behavior of in-process ParDo evaluator This will make way for using the evluator in contexts where cloning is not appropriate, such as evaluator GroupAlsoByWindow --- .../runners/inprocess/CloningSupplier.java | 42 +++++++++++++++++++ .../inprocess/ParDoInProcessEvaluator.java | 7 ++-- .../inprocess/ParDoMultiEvaluatorFactory.java | 2 +- .../ParDoSingleEvaluatorFactory.java | 12 ++++-- 4 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java new file mode 100644 index 000000000000..1ddb48a014ee --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CloningSupplier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.util.SerializableUtils; + +import com.google.common.base.Supplier; + +import java.io.Serializable; + +class CloningSupplier implements Supplier { + private final T value; + + public static CloningSupplier forValue(ValueT value) { + return new CloningSupplier<>(value); + } + + private CloningSupplier(T value) { + this.value = value; + } + + @Override + public T get() { + return SerializableUtils.clone(value); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 35639bdcac5b..8d1849953fd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; @@ -34,6 +33,8 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.base.Supplier; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -45,7 +46,7 @@ public static ParDoInProcessEvaluator create( InProcessEvaluationContext evaluationContext, CommittedBundle inputBundle, AppliedPTransform, ?, ?> application, - DoFn fn, + Supplier> fnSupplier, List> sideInputs, TupleTag mainOutputTag, List> sideOutputTags, @@ -68,7 +69,7 @@ public static ParDoInProcessEvaluator create( DoFnRunner runner = DoFnRunners.createDefault( evaluationContext.getPipelineOptions(), - SerializableUtils.clone(fn), + fnSupplier.get(), evaluationContext.createSideInputReader(sideInputs), BundleOutputManager.create(outputBundles), mainOutputTag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index 299d3a89125b..1a4ab1af598a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -54,7 +54,7 @@ private static ParDoInProcessEvaluator createMultiEvaluator( evaluationContext, inputBundle, application, - fn, + CloningSupplier.forValue(fn), application.getTransform().getSideInputs(), application.getTransform().getMainOutputTag(), application.getTransform().getSideOutputTags().getAll(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index 4d38448392ec..c594fb1b6b5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -33,6 +33,8 @@ * {@link Bound ParDo.Bound} primitive {@link PTransform}. */ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { + + @Override public TransformEvaluator forApplication( final AppliedPTransform application, @@ -45,16 +47,18 @@ public TransformEvaluator forApplication( } private static ParDoInProcessEvaluator createSingleEvaluator( - @SuppressWarnings("rawtypes") AppliedPTransform, PCollection, - Bound> application, - CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { + @SuppressWarnings("rawtypes") + final AppliedPTransform, PCollection, Bound> + application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { TupleTag mainOutputTag = new TupleTag<>("out"); return ParDoInProcessEvaluator.create( evaluationContext, inputBundle, application, - application.getTransform().getFn(), + CloningSupplier.forValue(application.getTransform().getFn()), application.getTransform().getSideInputs(), mainOutputTag, Collections.>emptyList(), From 753787ff0eb10c524f336e9af837ed442f005121 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 16:13:24 -0700 Subject: [PATCH 4/4] Make in-process GroupByKey respect future Beam model This introduces top-level classes: - InProcessGroupByKey, which expands like GroupByKeyViaGroupByKeyOnly but with different intermediate PCollection types. - InProcessGroupByKeyOnly, which outputs KeyedWorkItem. This existed already under a different name. - InProcessGroupAlsoByWindow, which is evaluated directly and accepts input elements of type KeyedWorkItem. --- .../inprocess/InProcessEvaluationContext.java | 1 - ...cessGroupAlsoByWindowEvaluatorFactory.java | 134 +++++++++++++++ .../inprocess/InProcessGroupByKey.java | 132 +++++++++++++++ ...rocessGroupByKeyOnlyEvaluatorFactory.java} | 153 ++++-------------- .../InProcessGroupByKeyOverrideFactory.java | 41 +++++ .../inprocess/InProcessPipelineRunner.java | 5 +- .../inprocess/TransformEvaluatorRegistry.java | 8 +- ...ssGroupByKeyOnlyEvaluatorFactoryTest.java} | 57 +++---- 8 files changed, 375 insertions(+), 156 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupAlsoByWindowEvaluatorFactory.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKey.java rename sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/{GroupByKeyEvaluatorFactory.java => InProcessGroupByKeyOnlyEvaluatorFactory.java} (52%) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOverrideFactory.java rename sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/{GroupByKeyEvaluatorFactoryTest.java => InProcessGroupByKeyOnlyEvaluatorFactoryTest.java} (81%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java index 3990f0d04fdb..97d3089eda4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupAlsoByWindowEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupAlsoByWindowEvaluatorFactory.java new file mode 100644 index 000000000000..ac552cfe727a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupAlsoByWindowEvaluatorFactory.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.runners.inprocess.InProcessGroupByKey.InProcessGroupAlsoByWindow; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.SystemReduceFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; + +import java.util.Collections; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link GroupByKeyOnly} {@link PTransform}. + */ +class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator evaluator = + createEvaluator( + (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); + return evaluator; + } + + private TransformEvaluator> createEvaluator( + AppliedPTransform< + PCollection>, PCollection>>, + InProcessGroupAlsoByWindow> + application, + CommittedBundle> inputBundle, + InProcessEvaluationContext evaluationContext) { + return new InProcessGroupAlsoByWindowEvaluator( + evaluationContext, inputBundle, application); + } + + /** + * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored; + * all input should be in the global window since all output will be as well. + * + * @see GroupByKeyViaGroupByKeyOnly + */ + private static class InProcessGroupAlsoByWindowEvaluator + implements TransformEvaluator> { + + private final TransformEvaluator> gabwParDoEvaluator; + + private final AppliedPTransform< + PCollection>, PCollection>>, + InProcessGroupAlsoByWindow> + application; + + public InProcessGroupAlsoByWindowEvaluator( + final InProcessEvaluationContext evaluationContext, + CommittedBundle> inputBundle, + final AppliedPTransform< + PCollection>, PCollection>>, + InProcessGroupAlsoByWindow> + application) { + this.application = application; + + Coder valueCoder = + application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) application.getTransform().getWindowingStrategy(); + + DoFn, KV>> gabwDoFn = + GroupAlsoByWindowViaWindowSetDoFn.create( + windowingStrategy, + SystemReduceFn.buffering(valueCoder)); + + TupleTag>> mainOutputTag = new TupleTag>>() {}; + + // Not technically legit, as the application is not a ParDo + this.gabwParDoEvaluator = + ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + Suppliers.ofInstance(gabwDoFn), + Collections.>emptyList(), + mainOutputTag, + Collections.>emptyList(), + ImmutableMap., PCollection>of(mainOutputTag, application.getOutput())); + } + + @Override + public void processElement(WindowedValue> element) throws Exception { + gabwParDoEvaluator.processElement(element); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return gabwParDoEvaluator.finishBundle(); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKey.java new file mode 100644 index 000000000000..9f04bacce0bf --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKey.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +class InProcessGroupByKey + extends ForwardingPTransform>, PCollection>>> { + private final GroupByKey original; + + InProcessGroupByKey(GroupByKey from) { + this.original = from; + } + + @Override + public PTransform>, PCollection>>> delegate() { + return original; + } + + @Override + public PCollection>> apply(PCollection> input) { + @SuppressWarnings("unchecked") + KvCoder inputCoder = (KvCoder) input.getCoder(); + + // This operation groups by the combination of key and window, + // merging windows as needed, using the windows assigned to the + // key/value input elements and the window merge operation of the + // window function associated with the input PCollection. + WindowingStrategy windowingStrategy = input.getWindowingStrategy(); + + // By default, implement GroupByKey via a series of lower-level operations. + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows()) + .apply(new InProcessGroupByKeyOnly()) + .setCoder( + KeyedWorkItemCoder.of( + inputCoder.getKeyCoder(), + inputCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder())) + + // Group each key's values by window, merging windows as needed. + .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow(windowingStrategy)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) + .setCoder( + KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); + } + + static final class InProcessGroupByKeyOnly + extends PTransform>>, PCollection>> { + @Override + public PCollection> apply(PCollection>> input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + InProcessGroupByKeyOnly() {} + } + + static final class InProcessGroupAlsoByWindow + extends PTransform>, PCollection>>> { + + private final WindowingStrategy windowingStrategy; + + public InProcessGroupAlsoByWindow(WindowingStrategy windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + public WindowingStrategy getWindowingStrategy() { + return windowingStrategy; + } + + private KeyedWorkItemCoder getKeyedWorkItemCoder(Coder> inputCoder) { + // Coder> --> KvCoder<...> + checkArgument( + inputCoder instanceof KeyedWorkItemCoder, + "%s requires a %s<...> but got %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + inputCoder); + @SuppressWarnings("unchecked") + KeyedWorkItemCoder kvCoder = (KeyedWorkItemCoder) inputCoder; + return kvCoder; + } + + public Coder getKeyCoder(Coder> inputCoder) { + return getKeyedWorkItemCoder(inputCoder).getKeyCoder(); + } + + public Coder getValueCoder(Coder> inputCoder) { + return getKeyedWorkItemCoder(inputCoder).getElementCoder(); + } + + @Override + public PCollection>> apply(PCollection> input) { + return PCollection.>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactory.java similarity index 52% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactory.java index 4cec8411bb11..6e55c4ff4693 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactory.java @@ -19,33 +19,24 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; +import static com.google.common.base.Preconditions.checkState; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.runners.inprocess.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.runners.inprocess.StepTransformResult.Builder; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; @@ -54,17 +45,18 @@ import java.util.Map; /** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey} - * {@link PTransform}. + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link GroupByKeyOnly} {@link PTransform}. */ -class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { +class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { @Override public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator evaluator = createEvaluator( + TransformEvaluator evaluator = + createEvaluator( (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); return evaluator; } @@ -74,16 +66,22 @@ private TransformEvaluator>> createEvaluator( PCollection>>, PCollection>, InProcessGroupByKeyOnly> application, - final CommittedBundle> inputBundle, + final CommittedBundle>> inputBundle, final InProcessEvaluationContext evaluationContext) { - return new GroupByKeyEvaluator(evaluationContext, inputBundle, application); + return new InProcessGroupByKeyOnlyEvaluator(evaluationContext, inputBundle, application); } - private static class GroupByKeyEvaluator + /** + * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored; + * all input should be in the global window since all output will be as well. + * + * @see GroupByKeyViaGroupByKeyOnly + */ + private static class InProcessGroupByKeyOnlyEvaluator implements TransformEvaluator>> { private final InProcessEvaluationContext evaluationContext; - private final CommittedBundle> inputBundle; + private final CommittedBundle>> inputBundle; private final AppliedPTransform< PCollection>>, PCollection>, InProcessGroupByKeyOnly> @@ -91,9 +89,9 @@ private static class GroupByKeyEvaluator private final Coder keyCoder; private Map, List>> groupingMap; - public GroupByKeyEvaluator( + public InProcessGroupByKeyOnlyEvaluator( InProcessEvaluationContext evaluationContext, - CommittedBundle> inputBundle, + CommittedBundle>> inputBundle, AppliedPTransform< PCollection>>, PCollection>, InProcessGroupByKeyOnly> @@ -101,16 +99,18 @@ public GroupByKeyEvaluator( this.evaluationContext = evaluationContext; this.inputBundle = inputBundle; this.application = application; - - PCollection>> input = application.getInput(); - keyCoder = getKeyCoder(input.getCoder()); - groupingMap = new HashMap<>(); + this.keyCoder = getKeyCoder(application.getInput().getCoder()); + this.groupingMap = new HashMap<>(); } private Coder getKeyCoder(Coder>> coder) { - if (!(coder instanceof KvCoder)) { - throw new IllegalStateException(); - } + checkState( + coder instanceof KvCoder, + "%s requires a coder of class %s." + + " This is an internal error; this is checked during pipeline construction" + + " but became corrupted.", + getClass().getSimpleName(), + KvCoder.class.getSimpleName()); @SuppressWarnings("unchecked") Coder keyCoder = ((KvCoder>) coder).getKeyCoder(); return keyCoder; @@ -180,95 +180,4 @@ public int hashCode() { } } } - - /** - * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. - */ - public static final class InProcessGroupByKeyOverrideFactory - implements PTransformOverrideFactory { - @Override - public PTransform override( - PTransform transform) { - if (transform instanceof GroupByKey) { - @SuppressWarnings({"rawtypes", "unchecked"}) - PTransform override = new InProcessGroupByKey((GroupByKey) transform); - return override; - } - return transform; - } - } - - /** - * An in-memory implementation of the {@link GroupByKey} primitive as a composite - * {@link PTransform}. - */ - private static final class InProcessGroupByKey - extends ForwardingPTransform>, PCollection>>> { - private final GroupByKey original; - - private InProcessGroupByKey(GroupByKey from) { - this.original = from; - } - - @Override - public PTransform>, PCollection>>> delegate() { - return original; - } - - @Override - public PCollection>> apply(PCollection> input) { - KvCoder inputCoder = (KvCoder) input.getCoder(); - - // This operation groups by the combination of key and window, - // merging windows as needed, using the windows assigned to the - // key/value input elements and the window merge operation of the - // window function associated with the input PCollection. - WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - - // Use the default GroupAlsoByWindow implementation - DoFn, KV>> groupAlsoByWindow = - groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder()); - - // By default, implement GroupByKey via a series of lower-level operations. - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows()) - - .apply(new InProcessGroupByKeyOnly()) - .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), - inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) - - // Group each key's values by window, merging windows as needed. - .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) - .setCoder( - KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); - } - - private - DoFn, KV>> groupAlsoByWindow( - final WindowingStrategy windowingStrategy, final Coder inputCoder) { - return GroupAlsoByWindowViaWindowSetDoFn.create( - windowingStrategy, SystemReduceFn.buffering(inputCoder)); - } - } - - /** - * An implementation primitive to use in the evaluation of a {@link GroupByKey} - * {@link PTransform}. - */ - public static final class InProcessGroupByKeyOnly - extends PTransform>>, PCollection>> { - @Override - public PCollection> apply(PCollection>> input) { - return PCollection.>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - @VisibleForTesting - InProcessGroupByKeyOnly() {} - } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOverrideFactory.java new file mode 100644 index 000000000000..d975dfe3b581 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOverrideFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.runners.inprocess; + +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. + */ +final class InProcessGroupByKeyOverrideFactory + implements PTransformOverrideFactory { + @Override + public PTransform override( + PTransform transform) { + if (transform instanceof GroupByKey) { + @SuppressWarnings({"rawtypes", "unchecked"}) + PTransform override = + (PTransform) new InProcessGroupByKey((GroupByKey) transform); + return override; + } + return transform; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java index 7897f2e31caf..c6e523f1a90b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -28,8 +28,6 @@ import org.apache.beam.sdk.runners.AggregatorRetrievalException; import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory; import org.apache.beam.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -37,6 +35,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.sdk.util.MapAggregatorValues; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; @@ -221,7 +220,7 @@ public InProcessPipelineResult run(Pipeline pipeline) { KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create( ImmutableSet.>of( - GroupByKey.class, InProcessGroupByKeyOnly.class)); + GroupByKey.class, GroupByKeyOnly.class)); pipeline.traverseTopologically(keyedPValueVisitor); InProcessEvaluationContext context = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java index f6542b8e13d4..ec1ac72219a4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.runners.inprocess; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.inprocess.InProcessGroupByKey.InProcessGroupAlsoByWindow; +import org.apache.beam.sdk.runners.inprocess.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; @@ -44,12 +46,12 @@ public static TransformEvaluatorRegistry defaultRegistry() { .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) - .put( - GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, - new GroupByKeyEvaluatorFactory()) .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) .put(Window.Bound.class, new WindowEvaluatorFactory()) + // Runner-specific primitives used in expansion of GroupByKey + .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory()) + .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory()) .build(); return new TransformEvaluatorRegistry(primitives); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java similarity index 81% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java index b7ce1691bcd6..7f4a53db127d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java @@ -17,7 +17,12 @@ */ package org.apache.beam.sdk.runners.inprocess; +import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv; +import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; + import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -28,15 +33,14 @@ import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multiset; import org.hamcrest.BaseMatcher; @@ -47,10 +51,10 @@ import org.junit.runners.JUnit4; /** - * Tests for {@link GroupByKeyEvaluatorFactory}. + * Tests for {@link InProcessGroupByKeyOnlyEvaluatorFactory}. */ @RunWith(JUnit4.class) -public class GroupByKeyEvaluatorFactoryTest { +public class InProcessGroupByKeyOnlyEvaluatorFactoryTest { private BundleFactory bundleFactory = InProcessBundleFactory.create(); @Test @@ -66,18 +70,18 @@ public void testInMemoryEvaluator() throws Exception { p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); PCollection>> kvs = values.apply(new ReifyTimestampsAndWindows()); - PCollection> groupedKvs = - kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly()); + PCollection>>> groupedKvs = + kvs.apply(new GroupByKeyOnly>()); CommittedBundle>> inputBundle = bundleFactory.createRootBundle(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle> fooBundle = + UncommittedBundle>>> fooBundle = bundleFactory.createKeyedBundle(null, "foo", groupedKvs); - UncommittedBundle> barBundle = + UncommittedBundle>>> barBundle = bundleFactory.createKeyedBundle(null, "bar", groupedKvs); - UncommittedBundle> bazBundle = + UncommittedBundle>>> bazBundle = bundleFactory.createKeyedBundle(null, "baz", groupedKvs); when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); @@ -89,7 +93,7 @@ public void testInMemoryEvaluator() throws Exception { Coder keyCoder = ((KvCoder>) kvs.getCoder()).getKeyCoder(); TransformEvaluator>> evaluator = - new GroupByKeyEvaluatorFactory() + new InProcessGroupByKeyOnlyEvaluatorFactory() .forApplication( groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); @@ -105,32 +109,30 @@ public void testInMemoryEvaluator() throws Exception { assertThat( fooBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "foo", - ImmutableSet.of( + isWindowedValue( + isKv( + equalTo("foo"), + containsInAnyOrder( WindowedValue.valueInGlobalWindow(-1), WindowedValue.valueInGlobalWindow(1), - WindowedValue.valueInGlobalWindow(3))), - keyCoder))); + WindowedValue.valueInGlobalWindow(3)))))); assertThat( barBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "bar", - ImmutableSet.of( + isWindowedValue( + isKv( + equalTo("bar"), + containsInAnyOrder( WindowedValue.valueInGlobalWindow(12), - WindowedValue.valueInGlobalWindow(22))), - keyCoder))); + WindowedValue.valueInGlobalWindow(22)))))); + assertThat( bazBundle.commit(Instant.now()).getElements(), contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "baz", - ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), - keyCoder))); + isWindowedValue( + isKv( + equalTo("baz"), + containsInAnyOrder(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE)))))); } private KV> gwValue(KV kv) { @@ -152,6 +154,7 @@ public boolean matches(Object item) { if (item == null || !(item instanceof WindowedValue)) { return false; } + @SuppressWarnings("unchecked") WindowedValue> that = (WindowedValue>) item; Multiset> myValues = HashMultiset.create(); Multiset> thatValues = HashMultiset.create();