From ca1b3a87fbe7218c0af912ba0f0deae8b903b1ac Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 15:51:40 -0700 Subject: [PATCH 1/2] Add accessors for sub-coders of KeyedWorkItemCoder --- .../java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java index 763f68b302e3..ec5d82138269 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java @@ -79,6 +79,14 @@ private KeyedWorkItemCoder( this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder)); } + public Coder getKeyCoder() { + return keyCoder; + } + + public Coder getElementCoder() { + return elemCoder; + } + @Override public void encode(KeyedWorkItem value, OutputStream outStream, Coder.Context context) throws CoderException, IOException { From aad284a513e829552e9ae7fa10ea89cfd89bdb5f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Apr 2016 16:12:21 -0700 Subject: [PATCH 2/2] Make in-process GroupByKey respect future Beam model This introduces or clarifies the following transforms: - InProcessGroupByKey, which expands like GroupByKeyViaGroupByKeyOnly but with different intermediate PCollection types. - InProcessGroupByKeyOnly, which outputs KeyedWorkItem. This existed already under a different name. - InProcessGroupAlsoByWindow, which is evaluated directly and accepts input elements of type KeyedWorkItem. --- .../beam/runners/direct/BundleFactory.java | 2 +- .../direct/InProcessEvaluationContext.java | 2 +- ...cessGroupAlsoByWindowEvaluatorFactory.java | 127 ++++++++++++ .../runners/direct/InProcessGroupByKey.java | 132 +++++++++++++ ...rocessGroupByKeyOnlyEvaluatorFactory.java} | 153 +++------------ .../InProcessGroupByKeyOverrideFactory.java | 41 ++++ .../direct/InProcessPipelineRunner.java | 3 +- .../direct/TransformEvaluatorRegistry.java | 8 +- .../GroupByKeyEvaluatorFactoryTest.java | 4 +- ...essGroupByKeyOnlyEvaluatorFactoryTest.java | 183 ++++++++++++++++++ 10 files changed, 524 insertions(+), 131 deletions(-) create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java rename runners/direct-java/src/main/java/org/apache/beam/runners/direct/{GroupByKeyEvaluatorFactory.java => InProcessGroupByKeyOnlyEvaluatorFactory.java} (52%) create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java create mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java index 34529e7803c1..fea48416252d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.PTransform; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index 9eeafbb5afe7..f348d9346a60 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -19,9 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java new file mode 100644 index 000000000000..5ded8b68f8d3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.SystemReduceFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.collect.ImmutableMap; + +import java.util.Collections; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link GroupByKeyOnly} {@link PTransform}. + */ +class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator evaluator = + createEvaluator( + (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); + return evaluator; + } + + private TransformEvaluator> createEvaluator( + AppliedPTransform< + PCollection>, PCollection>>, + InProcessGroupAlsoByWindow> + application, + CommittedBundle> inputBundle, + InProcessEvaluationContext evaluationContext) { + return new InProcessGroupAlsoByWindowEvaluator( + evaluationContext, inputBundle, application); + } + + /** + * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored; + * all input should be in the global window since all output will be as well. + * + * @see GroupByKeyViaGroupByKeyOnly + */ + private static class InProcessGroupAlsoByWindowEvaluator + implements TransformEvaluator> { + + private final TransformEvaluator> gabwParDoEvaluator; + + public InProcessGroupAlsoByWindowEvaluator( + final InProcessEvaluationContext evaluationContext, + CommittedBundle> inputBundle, + final AppliedPTransform< + PCollection>, PCollection>>, + InProcessGroupAlsoByWindow> + application) { + + Coder valueCoder = + application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) application.getTransform().getWindowingStrategy(); + + DoFn, KV>> gabwDoFn = + GroupAlsoByWindowViaWindowSetDoFn.create( + windowingStrategy, + SystemReduceFn.buffering(valueCoder)); + + TupleTag>> mainOutputTag = new TupleTag>>() {}; + + // Not technically legit, as the application is not a ParDo + this.gabwParDoEvaluator = + ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + gabwDoFn, + Collections.>emptyList(), + mainOutputTag, + Collections.>emptyList(), + ImmutableMap., PCollection>of(mainOutputTag, application.getOutput())); + } + + @Override + public void processElement(WindowedValue> element) throws Exception { + gabwParDoEvaluator.processElement(element); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return gabwParDoEvaluator.finishBundle(); + } + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java new file mode 100644 index 000000000000..026b4d5636f3 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +class InProcessGroupByKey + extends ForwardingPTransform>, PCollection>>> { + private final GroupByKey original; + + InProcessGroupByKey(GroupByKey from) { + this.original = from; + } + + @Override + public PTransform>, PCollection>>> delegate() { + return original; + } + + @Override + public PCollection>> apply(PCollection> input) { + @SuppressWarnings("unchecked") + KvCoder inputCoder = (KvCoder) input.getCoder(); + + // This operation groups by the combination of key and window, + // merging windows as needed, using the windows assigned to the + // key/value input elements and the window merge operation of the + // window function associated with the input PCollection. + WindowingStrategy windowingStrategy = input.getWindowingStrategy(); + + // By default, implement GroupByKey via a series of lower-level operations. + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows()) + .apply(new InProcessGroupByKeyOnly()) + .setCoder( + KeyedWorkItemCoder.of( + inputCoder.getKeyCoder(), + inputCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder())) + + // Group each key's values by window, merging windows as needed. + .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow(windowingStrategy)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) + .setCoder( + KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); + } + + static final class InProcessGroupByKeyOnly + extends PTransform>>, PCollection>> { + @Override + public PCollection> apply(PCollection>> input) { + return PCollection.>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + InProcessGroupByKeyOnly() {} + } + + static final class InProcessGroupAlsoByWindow + extends PTransform>, PCollection>>> { + + private final WindowingStrategy windowingStrategy; + + public InProcessGroupAlsoByWindow(WindowingStrategy windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + public WindowingStrategy getWindowingStrategy() { + return windowingStrategy; + } + + private KeyedWorkItemCoder getKeyedWorkItemCoder(Coder> inputCoder) { + // Coder> --> KvCoder<...> + checkArgument( + inputCoder instanceof KeyedWorkItemCoder, + "%s requires a %s<...> but got %s", + getClass().getSimpleName(), + KvCoder.class.getSimpleName(), + inputCoder); + @SuppressWarnings("unchecked") + KeyedWorkItemCoder kvCoder = (KeyedWorkItemCoder) inputCoder; + return kvCoder; + } + + public Coder getKeyCoder(Coder> inputCoder) { + return getKeyedWorkItemCoder(inputCoder).getKeyCoder(); + } + + public Coder getValueCoder(Coder> inputCoder) { + return getKeyedWorkItemCoder(inputCoder).getElementCoder(); + } + + @Override + public PCollection>> apply(PCollection> input) { + return PCollection.>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java similarity index 52% rename from runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java rename to runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java index 9a08996be215..79db5b696659 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java @@ -19,33 +19,24 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; @@ -54,17 +45,18 @@ import java.util.Map; /** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey} - * {@link PTransform}. + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link GroupByKeyOnly} {@link PTransform}. */ -class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { +class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { @Override public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator evaluator = createEvaluator( + TransformEvaluator evaluator = + createEvaluator( (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); return evaluator; } @@ -74,16 +66,22 @@ private TransformEvaluator>> createEvaluator( PCollection>>, PCollection>, InProcessGroupByKeyOnly> application, - final CommittedBundle> inputBundle, + final CommittedBundle>> inputBundle, final InProcessEvaluationContext evaluationContext) { - return new GroupByKeyEvaluator(evaluationContext, inputBundle, application); + return new InProcessGroupByKeyOnlyEvaluator(evaluationContext, inputBundle, application); } - private static class GroupByKeyEvaluator + /** + * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored; + * all input should be in the global window since all output will be as well. + * + * @see GroupByKeyViaGroupByKeyOnly + */ + private static class InProcessGroupByKeyOnlyEvaluator implements TransformEvaluator>> { private final InProcessEvaluationContext evaluationContext; - private final CommittedBundle> inputBundle; + private final CommittedBundle>> inputBundle; private final AppliedPTransform< PCollection>>, PCollection>, InProcessGroupByKeyOnly> @@ -91,9 +89,9 @@ private static class GroupByKeyEvaluator private final Coder keyCoder; private Map, List>> groupingMap; - public GroupByKeyEvaluator( + public InProcessGroupByKeyOnlyEvaluator( InProcessEvaluationContext evaluationContext, - CommittedBundle> inputBundle, + CommittedBundle>> inputBundle, AppliedPTransform< PCollection>>, PCollection>, InProcessGroupByKeyOnly> @@ -101,16 +99,18 @@ public GroupByKeyEvaluator( this.evaluationContext = evaluationContext; this.inputBundle = inputBundle; this.application = application; - - PCollection>> input = application.getInput(); - keyCoder = getKeyCoder(input.getCoder()); - groupingMap = new HashMap<>(); + this.keyCoder = getKeyCoder(application.getInput().getCoder()); + this.groupingMap = new HashMap<>(); } private Coder getKeyCoder(Coder>> coder) { - if (!(coder instanceof KvCoder)) { - throw new IllegalStateException(); - } + checkState( + coder instanceof KvCoder, + "%s requires a coder of class %s." + + " This is an internal error; this is checked during pipeline construction" + + " but became corrupted.", + getClass().getSimpleName(), + KvCoder.class.getSimpleName()); @SuppressWarnings("unchecked") Coder keyCoder = ((KvCoder>) coder).getKeyCoder(); return keyCoder; @@ -180,95 +180,4 @@ public int hashCode() { } } } - - /** - * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. - */ - public static final class InProcessGroupByKeyOverrideFactory - implements PTransformOverrideFactory { - @Override - public PTransform override( - PTransform transform) { - if (transform instanceof GroupByKey) { - @SuppressWarnings({"rawtypes", "unchecked"}) - PTransform override = new InProcessGroupByKey((GroupByKey) transform); - return override; - } - return transform; - } - } - - /** - * An in-memory implementation of the {@link GroupByKey} primitive as a composite - * {@link PTransform}. - */ - private static final class InProcessGroupByKey - extends ForwardingPTransform>, PCollection>>> { - private final GroupByKey original; - - private InProcessGroupByKey(GroupByKey from) { - this.original = from; - } - - @Override - public PTransform>, PCollection>>> delegate() { - return original; - } - - @Override - public PCollection>> apply(PCollection> input) { - KvCoder inputCoder = (KvCoder) input.getCoder(); - - // This operation groups by the combination of key and window, - // merging windows as needed, using the windows assigned to the - // key/value input elements and the window merge operation of the - // window function associated with the input PCollection. - WindowingStrategy windowingStrategy = input.getWindowingStrategy(); - - // Use the default GroupAlsoByWindow implementation - DoFn, KV>> groupAlsoByWindow = - groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder()); - - // By default, implement GroupByKey via a series of lower-level operations. - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows()) - - .apply(new InProcessGroupByKeyOnly()) - .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), - inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) - - // Group each key's values by window, merging windows as needed. - .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) - .setCoder( - KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); - } - - private - DoFn, KV>> groupAlsoByWindow( - final WindowingStrategy windowingStrategy, final Coder inputCoder) { - return GroupAlsoByWindowViaWindowSetDoFn.create( - windowingStrategy, SystemReduceFn.buffering(inputCoder)); - } - } - - /** - * An implementation primitive to use in the evaluation of a {@link GroupByKey} - * {@link PTransform}. - */ - public static final class InProcessGroupByKeyOnly - extends PTransform>>, PCollection>> { - @Override - public PCollection> apply(PCollection>> input) { - return PCollection.>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - @VisibleForTesting - InProcessGroupByKeyOnly() {} - } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java new file mode 100644 index 000000000000..1d84bc905fee --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. + */ +final class InProcessGroupByKeyOverrideFactory + implements PTransformOverrideFactory { + @Override + public PTransform override( + PTransform transform) { + if (transform instanceof GroupByKey) { + @SuppressWarnings({"rawtypes", "unchecked"}) + PTransform override = + (PTransform) new InProcessGroupByKey((GroupByKey) transform); + return override; + } + return transform; + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java index 19e9f47de568..a7f6941a738c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java @@ -17,8 +17,7 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; -import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index f449731a5918..81d252087c8c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow; +import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly; import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -44,12 +46,12 @@ public static TransformEvaluatorRegistry defaultRegistry() { .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) - .put( - GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, - new GroupByKeyEvaluatorFactory()) .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) .put(Window.Bound.class, new WindowEvaluatorFactory()) + // Runner-specific primitives used in expansion of GroupByKey + .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory()) + .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory()) .build(); return new TransformEvaluatorRegistry(primitives); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 267266d3b891..92f845c8bbef 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -67,7 +67,7 @@ public void testInMemoryEvaluator() throws Exception { PCollection>> kvs = values.apply(new ReifyTimestampsAndWindows()); PCollection> groupedKvs = - kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly()); + kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly()); CommittedBundle>> inputBundle = bundleFactory.createRootBundle(kvs).commit(Instant.now()); @@ -89,7 +89,7 @@ public void testInMemoryEvaluator() throws Exception { Coder keyCoder = ((KvCoder>) kvs.getCoder()).getKeyCoder(); TransformEvaluator>> evaluator = - new GroupByKeyEvaluatorFactory() + new InProcessGroupByKeyOnlyEvaluatorFactory() .forApplication( groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java new file mode 100644 index 000000000000..1172a4d08dfd --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multiset; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link InProcessGroupByKeyOnlyEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class InProcessGroupByKeyOnlyEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void testInMemoryEvaluator() throws Exception { + TestPipeline p = TestPipeline.create(); + KV firstFoo = KV.of("foo", -1); + KV secondFoo = KV.of("foo", 1); + KV thirdFoo = KV.of("foo", 3); + KV firstBar = KV.of("bar", 22); + KV secondBar = KV.of("bar", 12); + KV firstBaz = KV.of("baz", Integer.MAX_VALUE); + PCollection> values = + p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); + PCollection>> kvs = + values.apply(new ReifyTimestampsAndWindows()); + PCollection> groupedKvs = + kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly()); + + CommittedBundle>> inputBundle = + bundleFactory.createRootBundle(kvs).commit(Instant.now()); + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + + UncommittedBundle> fooBundle = + bundleFactory.createKeyedBundle(null, "foo", groupedKvs); + UncommittedBundle> barBundle = + bundleFactory.createKeyedBundle(null, "bar", groupedKvs); + UncommittedBundle> bazBundle = + bundleFactory.createKeyedBundle(null, "baz", groupedKvs); + + when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); + when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); + when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle); + + // The input to a GroupByKey is assumed to be a KvCoder + @SuppressWarnings("unchecked") + Coder keyCoder = + ((KvCoder>) kvs.getCoder()).getKeyCoder(); + TransformEvaluator>> evaluator = + new InProcessGroupByKeyOnlyEvaluatorFactory() + .forApplication( + groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); + + evaluator.finishBundle(); + + assertThat( + fooBundle.commit(Instant.now()).getElements(), + contains( + new KeyedWorkItemMatcher( + KeyedWorkItems.elementsWorkItem( + "foo", + ImmutableSet.of( + WindowedValue.valueInGlobalWindow(-1), + WindowedValue.valueInGlobalWindow(1), + WindowedValue.valueInGlobalWindow(3))), + keyCoder))); + assertThat( + barBundle.commit(Instant.now()).getElements(), + contains( + new KeyedWorkItemMatcher( + KeyedWorkItems.elementsWorkItem( + "bar", + ImmutableSet.of( + WindowedValue.valueInGlobalWindow(12), + WindowedValue.valueInGlobalWindow(22))), + keyCoder))); + assertThat( + bazBundle.commit(Instant.now()).getElements(), + contains( + new KeyedWorkItemMatcher( + KeyedWorkItems.elementsWorkItem( + "baz", + ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), + keyCoder))); + } + + private KV> gwValue(KV kv) { + return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue())); + } + + private static class KeyedWorkItemMatcher + extends BaseMatcher>> { + private final KeyedWorkItem myWorkItem; + private final Coder keyCoder; + + public KeyedWorkItemMatcher(KeyedWorkItem myWorkItem, Coder keyCoder) { + this.myWorkItem = myWorkItem; + this.keyCoder = keyCoder; + } + + @Override + public boolean matches(Object item) { + if (item == null || !(item instanceof WindowedValue)) { + return false; + } + WindowedValue> that = (WindowedValue>) item; + Multiset> myValues = HashMultiset.create(); + Multiset> thatValues = HashMultiset.create(); + for (WindowedValue value : myWorkItem.elementsIterable()) { + myValues.add(value); + } + for (WindowedValue value : that.getValue().elementsIterable()) { + thatValues.add(value); + } + try { + return myValues.equals(thatValues) + && keyCoder + .structuralValue(myWorkItem.key()) + .equals(keyCoder.structuralValue(that.getValue().key())); + } catch (Exception e) { + return false; + } + } + + @Override + public void describeTo(Description description) { + description + .appendText("KeyedWorkItem containing key ") + .appendValue(myWorkItem.key()) + .appendText(" and values ") + .appendValueList("[", ", ", "]", myWorkItem.elementsIterable()); + } + } +}