From c9bfc5bffa62dfcd456febdf25f0d7a08f9a8dea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Thu, 1 Feb 2018 09:21:24 +0100 Subject: [PATCH 1/2] [BEAM-2806] Fix pipeline translation mode recognition --- .../core/construction/ReadTranslation.java | 14 ++++ .../FlinkPipelineExecutionEnvironment.java | 7 +- .../FlinkStreamingTransformTranslators.java | 3 +- .../flink/PipelineTranslationOptimizer.java | 14 +++- ...FlinkPipelineExecutionEnvironmentTest.java | 68 +++++++++++++++++++ 5 files changed, 98 insertions(+), 8 deletions(-) create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index d2be649dd3b5..b8f0f5b56ed6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -30,6 +30,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded; +import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; @@ -152,6 +153,19 @@ public static PCollection.IsBounded sourceIsBounded(AppliedPTransform t } } + public static IsBounded.Enum isBounded(AppliedPTransform transform) { + try { + return ReadPayload.parseFrom( + PTransformTranslation + .toProto(transform, Collections.emptyList(), SdkComponents.create()) + .getSpec() + .getPayload()) + .getIsBounded(); + } catch (IOException e) { + return Enum.UNRECOGNIZED; + } + } + /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */ public static class UnboundedReadPayloadTranslator extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 7a6c61f8b36d..ddc4e4b85409 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -93,14 +93,15 @@ public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { throw new RuntimeException(e); } - pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming())); - PipelineTranslationOptimizer optimizer = - new PipelineTranslationOptimizer(TranslationMode.BATCH, options); + new PipelineTranslationOptimizer(TranslationMode.BATCH, options, pipeline); optimizer.translate(pipeline); TranslationMode translationMode = optimizer.getTranslationMode(); + pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides( + translationMode == TranslationMode.STREAMING)); + FlinkPipelineTranslator translator; if (translationMode == TranslationMode.STREAMING) { this.flinkStreamEnv = createStreamExecutionEnvironment(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 811c15940c1f..a2923a97cc4f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; @@ -253,7 +252,7 @@ void translateNode( if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { boundedTranslator.translateNode(transform, context); } else { - unboundedTranslator.translateNode((Read.Unbounded) transform, context); + unboundedTranslator.translateNode(transform, context); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java index 3acc3eafca13..982b03ccf385 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java @@ -17,7 +17,10 @@ */ package org.apache.beam.runners.flink; -import org.apache.beam.sdk.io.Read; +import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded.Enum; +import org.apache.beam.runners.core.construction.ReadTranslation; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; @@ -34,10 +37,13 @@ class PipelineTranslationOptimizer extends FlinkPipelineTranslator { private TranslationMode translationMode; private final FlinkPipelineOptions options; + private final Pipeline pipeline; - public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { + public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options, + Pipeline pipeline) { this.translationMode = defaultMode; this.options = options; + this.pipeline = pipeline; } public TranslationMode getTranslationMode() { @@ -60,8 +66,10 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {} @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { + AppliedPTransform appliedPTransform = node.toAppliedPTransform(pipeline); Class transformClass = node.getTransform().getClass(); - if (transformClass == Read.Unbounded.class) { + + if (ReadTranslation.isBounded(appliedPTransform) == Enum.UNBOUNDED) { LOG.info("Found {}. Switching to streaming execution.", transformClass); translationMode = TranslationMode.STREAMING; } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java new file mode 100644 index 000000000000..0e5ce144135e --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FlinkPipelineExecutionEnvironment}. + */ +@RunWith(JUnit4.class) +public class FlinkPipelineExecutionEnvironmentTest implements Serializable { + + @Test + public void shouldRecognizeAndTranslateStreamingPipeline() { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setRunner(TestFlinkRunner.class); + options.setFlinkMaster("[auto]"); + + FlinkRunner flinkRunner = FlinkRunner.fromOptions(options); + FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); + Pipeline pipeline = Pipeline.create(); + + pipeline + .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1))) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(Long.toString(c.element())); + } + })) + .apply(Window.into(FixedWindows.of(Duration.standardHours(1)))) + .apply(TextIO.write().withNumShards(1).withWindowedWrites().to("/dummy/path")); + + flinkEnv.translate(flinkRunner, pipeline); + + // no exception should be thrown + } + +} + + From 3cc9117705cd79f856de8472745a160519009b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Mon, 5 Feb 2018 20:42:02 +0100 Subject: [PATCH 2/2] [BEAM-2806] Refactor PTransform unboundedness check --- .../core/construction/ReadTranslation.java | 14 ----------- .../FlinkPipelineExecutionEnvironment.java | 2 +- .../flink/PipelineTranslationOptimizer.java | 24 ++++++++++--------- 3 files changed, 14 insertions(+), 26 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index b8f0f5b56ed6..d2be649dd3b5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -30,7 +30,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded; -import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; @@ -153,19 +152,6 @@ public static PCollection.IsBounded sourceIsBounded(AppliedPTransform t } } - public static IsBounded.Enum isBounded(AppliedPTransform transform) { - try { - return ReadPayload.parseFrom( - PTransformTranslation - .toProto(transform, Collections.emptyList(), SdkComponents.create()) - .getSpec() - .getPayload()) - .getIsBounded(); - } catch (IOException e) { - return Enum.UNRECOGNIZED; - } - } - /** A {@link TransformPayloadTranslator} for {@link Read.Unbounded}. */ public static class UnboundedReadPayloadTranslator extends PTransformTranslation.TransformPayloadTranslator.WithDefaultRehydration< diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index ddc4e4b85409..7f7281e14bd9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -94,7 +94,7 @@ public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { } PipelineTranslationOptimizer optimizer = - new PipelineTranslationOptimizer(TranslationMode.BATCH, options, pipeline); + new PipelineTranslationOptimizer(TranslationMode.BATCH, options); optimizer.translate(pipeline); TranslationMode translationMode = optimizer.getTranslationMode(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java index 982b03ccf385..8877f1a044ac 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.flink; -import org.apache.beam.model.pipeline.v1.RunnerApi.IsBounded.Enum; -import org.apache.beam.runners.core.construction.ReadTranslation; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,13 +36,10 @@ class PipelineTranslationOptimizer extends FlinkPipelineTranslator { private TranslationMode translationMode; private final FlinkPipelineOptions options; - private final Pipeline pipeline; - public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options, - Pipeline pipeline) { + public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) { this.translationMode = defaultMode; this.options = options; - this.pipeline = pipeline; } public TranslationMode getTranslationMode() { @@ -66,15 +62,21 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {} @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - AppliedPTransform appliedPTransform = node.toAppliedPTransform(pipeline); - Class transformClass = node.getTransform().getClass(); - - if (ReadTranslation.isBounded(appliedPTransform) == Enum.UNBOUNDED) { + AppliedPTransform appliedPTransform = node.toAppliedPTransform(getPipeline()); + if (hasUnboundedOutput(appliedPTransform)) { + Class transformClass = node.getTransform().getClass(); LOG.info("Found {}. Switching to streaming execution.", transformClass); translationMode = TranslationMode.STREAMING; } } + private boolean hasUnboundedOutput(AppliedPTransform transform) { + return transform.getOutputs().values().stream() + .filter(value -> value instanceof PCollection) + .map(value -> (PCollection) value) + .anyMatch(collection -> collection.isBounded() == IsBounded.UNBOUNDED); + } + @Override public void visitValue(PValue value, TransformHierarchy.Node producer) {} }