diff --git a/CHANGES.md b/CHANGES.md index d36fb2ffaa76..e50a58d3b944 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -113,6 +113,9 @@ * The Go SDK now requires Go 1.19 to build. ([#25545](https://github.com/apache/beam/pull/25545)) * The Go SDK now has an initial native Go implementation of a portable Beam Runner called Prism. ([#24789](https://github.com/apache/beam/pull/24789)) * For more details and current state see https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism. +* Add `UseDataStreamForBatch` pipeline option to the Flink runner. When it is set to true, Flink runner will run batch + jobs executed with DataStream API. By default the option is set to false, so the batch jobs are still executed + with DataSet API. ## Breaking Changes diff --git a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index bb794e04398d..5072e6b2459f 100644 --- a/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.12/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java index 3b64612d6d19..d8740964fda9 100644 --- a/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java +++ b/runners/flink/1.14/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; /** Compatibility layer for {@link AbstractStreamOperator} breaking changes. */ public abstract class AbstractStreamOperatorCompat @@ -44,9 +45,18 @@ protected int numProcessingTimeTimers() { return getTimeServiceManager() .map( manager -> { - final InternalTimeServiceManagerImpl cast = - (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); - return cast.numProcessingTimeTimers(); + InternalTimeServiceManager tsm = getTimeServiceManagerCompat(); + if (tsm instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) getTimeServiceManagerCompat(); + return cast.numProcessingTimeTimers(); + } else if (tsm instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", tsm)); + } }) .orElse(0); } diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index cdfe921c1b2e..0ae409b097a6 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -222,6 +222,7 @@ class ValidatesRunnerConfig { String name boolean streaming boolean checkpointing + boolean useDataStreamForBatch ArrayList sickbayTests } @@ -240,6 +241,7 @@ def createValidatesRunnerTask(Map m) { description = "Validates the ${runnerType} runner" def pipelineOptionsArray = ["--runner=TestFlinkRunner", "--streaming=${config.streaming}", + "--useDataStreamForBatch=${config.useDataStreamForBatch}", "--parallelism=2", ] if (config.checkpointing) { @@ -298,12 +300,20 @@ def createValidatesRunnerTask(Map m) { excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate' // https://github.com/apache/beam/issues/20844 excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating' + if (!config.streaming) { + // FlinkBatchExecutionInternalTimeService does not support timer registration on timer firing. + excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testOnTimerTimestampSkew' + } else { + // https://github.com/apache/beam/issues/25485 + excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState' + } } } } } createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false, sickbayTests: sickbayTests) +createValidatesRunnerTask(name: "validatesRunnerBatchWithDataStream", streaming: false, useDataStreamForBatch: true, sickbayTests: sickbayTests) createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true, sickbayTests: sickbayTests) // We specifically have a variant which runs with checkpointing enabled for the // tests that require it since running a checkpoint variant is significantly @@ -316,6 +326,7 @@ tasks.register('validatesRunner') { group = 'Verification' description "Validates Flink runner" dependsOn validatesRunnerBatch + dependsOn validatesRunnerBatchWithDataStream dependsOn validatesRunnerStreaming dependsOn validatesRunnerStreamingCheckpointing } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 07bb3e9d6619..e247dc65baee 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -96,13 +97,15 @@ public void translate(Pipeline pipeline) { prepareFilesToStageForRemoteClusterExecution(options); FlinkPipelineTranslator translator; - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { - LOG.warn( - "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + LOG.warn("UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); + } + translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options, options.isStreaming()); + if (!options.isStreaming()) { + flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); } - translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options); } else { this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index c97e414ad79a..d77718b6d82d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -32,7 +32,7 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions { + extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions, VersionDependentFlinkPipelineOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 8a74735cd83e..1defe31b8dc5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -81,8 +81,11 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { private int depth = 0; - public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options); + public FlinkStreamingPipelineTranslator( + StreamExecutionEnvironment env, + PipelineOptions options, + boolean isStreaming) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options, isStreaming); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index ec97bb733776..ac743468677a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -38,9 +38,7 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; @@ -54,6 +52,9 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.unbounded.FlinkUnboundedSource; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -96,6 +97,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -220,16 +222,14 @@ public void translateNode( context.getExecutionEnvironment().getMaxParallelism() > 0 ? context.getExecutionEnvironment().getMaxParallelism() : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapper sourceWrapper = - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), rawSource, parallelism); + + FlinkUnboundedSource unboundedSource = FlinkSource.unbounded( + rawSource, new SerializablePipelineOptions(context.getPipelineOptions()), parallelism); nonDedupSource = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(withIdTypeInfo); + .fromSource(unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); if (rawSource.requiresDeduping()) { source = @@ -303,15 +303,24 @@ void translateNode(Impulse transform, FlinkStreamingTranslationContext context) WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE), context.getPipelineOptions()); - long shutdownAfterIdleSourcesMs = - context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getShutdownSourcesAfterIdleMs(); + FlinkBoundedSource impulseSource; + WatermarkStrategy> watermarkStrategy; + if (context.isStreaming()) { + long shutdownAfterIdleSourcesMs = + context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getShutdownSourcesAfterIdleMs(); + impulseSource = FlinkSource.unboundedImpulse(shutdownAfterIdleSourcesMs); + watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps(); + } else { + impulseSource = FlinkSource.boundedImpulse(); + watermarkStrategy = WatermarkStrategy.noWatermarks(); + } SingleOutputStreamOperator> source = context .getExecutionEnvironment() - .addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse") + .fromSource(impulseSource, watermarkStrategy, "Impulse") .returns(typeInfo); context.setOutputDataStream(context.getOutput(transform), source); @@ -330,7 +339,7 @@ private static class ReadSourceTranslator @Override void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - if (context.getOutput(transform).isBounded().equals(PCollection.IsBounded.BOUNDED)) { + if (ReadTranslation.sourceIsBounded(context.getCurrentTransform()) == PCollection.IsBounded.BOUNDED) { boundedTranslator.translateNode(transform, context); } else { unboundedTranslator.translateNode(transform, context); @@ -361,24 +370,23 @@ public void translateNode( } String fullName = getCurrentTransformName(context); - UnboundedSource adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource); + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded( + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + DataStream> source; try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - UnboundedSourceWrapperNoValueWithRecordId sourceWrapper = - new UnboundedSourceWrapperNoValueWithRecordId<>( - new UnboundedSourceWrapper<>( - fullName, context.getPipelineOptions(), adaptedRawSource, parallelism)); source = context .getExecutionEnvironment() - .addSource(sourceWrapper) - .name(fullName) - .uid(fullName) - .returns(outputTypeInfo); + .fromSource(flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName); } catch (Exception e) { throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); } @@ -545,7 +553,9 @@ static void translateParDo( KeySelector, ?> keySelector = null; boolean stateful = false; DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { + if (!signature.stateDeclarations().isEmpty() || + !signature.timerDeclarations().isEmpty() || + !signature.timerFamilyDeclarations().isEmpty()) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index cf235da1a025..4e01c0e1a1b8 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -51,6 +51,7 @@ class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; private final PipelineOptions options; + private final boolean isStreaming; /** * Keeps a mapping between the output value of the PTransform and the Flink Operator that produced @@ -62,9 +63,13 @@ class FlinkStreamingTranslationContext { private AppliedPTransform currentTransform; - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + public FlinkStreamingTranslationContext( + StreamExecutionEnvironment env, + PipelineOptions options, + boolean isStreaming) { this.env = checkNotNull(env); this.options = checkNotNull(options); + this.isStreaming = isStreaming; } public StreamExecutionEnvironment getExecutionEnvironment() { @@ -75,6 +80,10 @@ public PipelineOptions getPipelineOptions() { return options; } + public boolean isStreaming() { + return isStreaming; + } + @SuppressWarnings("unchecked") public DataStream getInputDataStream(PValue value) { return (DataStream) dataStreams.get(value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java index 3766e1ede1aa..b5145b030e62 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java @@ -36,7 +36,7 @@ class FlinkTransformOverrides { static List getDefaultOverrides(FlinkPipelineOptions options) { ImmutableList.Builder builder = ImmutableList.builder(); - if (options.isStreaming()) { + if (options.isStreaming() || options.getUseDataStreamForBatch()) { builder .add( PTransformOverride.of( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java new file mode 100644 index 000000000000..05b5ef41645c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/VersionDependentFlinkPipelineOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + + +public interface VersionDependentFlinkPipelineOptions extends PipelineOptions { + + @Description("When set to true, the batch job execution will use DataStream API. " + + "Otherwise, the batch job execution will use the legacy DataSet API.") + @Default.Boolean(false) + Boolean getUseDataStreamForBatch(); + + void setUseDataStreamForBatch(Boolean useDataStreamForBatch); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 13c4e4e0a99f..f1e334a5a99e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -110,10 +110,12 @@ import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -672,6 +674,7 @@ protected final void setBundleFinishedCallback(Runnable callback) { @Override public final void processElement(StreamRecord> streamRecord) { checkInvokeStartBundle(); + LOG.trace("Processing element {} in {}", streamRecord.getValue().getValue(), doFn.getClass()); long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; doFnRunner.processElement(streamRecord.getValue()); checkInvokeFinishBundleByCount(); @@ -754,6 +757,7 @@ public final void processElement2(StreamRecord streamRecord) thro @Override public final void processWatermark(Watermark mark) throws Exception { + LOG.trace("Processing watermark {} in {}", mark.getTimestamp(), doFn.getClass()); processWatermark1(mark); } @@ -1442,8 +1446,10 @@ private void populateOutputTimestampQueue(InternalTimerService timerS BiConsumerWithException consumer = (timerData, stamp) -> keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp()); - timerService.forEachEventTimeTimer(consumer); - timerService.forEachProcessingTimeTimer(consumer); + if (timerService instanceof InternalTimerServiceImpl) { + timerService.forEachEventTimeTimer(consumer); + timerService.forEachProcessingTimeTimer(consumer); + } } private String constructTimerId(String timerFamilyId, String timerId) { @@ -1494,6 +1500,7 @@ public void setTimer(TimerData timer) { } private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + LOG.debug("Registering timer {}", timer); pendingTimersById.put(contextTimerId, timer); long time = timer.getTimestamp().getMillis(); switch (timer.getDomain()) { @@ -1604,7 +1611,29 @@ public Instant currentProcessingTime() { @Override public Instant currentInputWatermarkTime() { - return new Instant(getEffectiveInputWatermark()); + if (timerService instanceof BatchExecutionInternalTimeService) { + // In batch mode, this method will only either return BoundedWindow.TIMESTAMP_MIN_VALUE, + // or BoundedWindow.TIMESTAMP_MAX_VALUE. + // + // For batch execution mode, the currentInputWatermark variable will never be updated + // until all the records are processed. However, every time when a record with a new + // key arrives, the Flink timer service watermark will be set to + // MAX_WATERMARK(LONG.MAX_VALUE) so that all the timers associated with the current + // key can fire. After that the Flink timer service watermark will be reset to + // LONG.MIN_VALUE, so the next key will start from a fresh env as if the previous + // records of a different key never existed. So the watermark is either Long.MIN_VALUE + // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. + // + // In Flink the watermark ranges from + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the beam + // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), + // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to + // the users follow the Beam convention, we just use the Beam range instead. + return timerService.currentWatermark() == Long.MAX_VALUE ? + new Instant(Long.MAX_VALUE) : BoundedWindow.TIMESTAMP_MIN_VALUE; + } else { + return new Instant(getEffectiveInputWatermark()); + } } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index c4d82cb5c8ad..6f2f473feddc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -51,4 +51,9 @@ public Iterable timersIterable() { public Iterable> elementsIterable() { return Collections.singletonList(value); } + + @Override + public String toString() { + return String.format("{%s, [%s]}", key, value); + } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java index 7d527716108e..140f7262d39c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java @@ -28,6 +28,8 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; @@ -43,17 +45,36 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkExecutionEnvironments}. */ +@RunWith(Parameterized.class) public class FlinkExecutionEnvironmentsTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); + @Parameterized.Parameter + public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] { + {false}, {true} + }); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldSetParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -65,7 +86,7 @@ public void shouldSetParallelismBatch() { @Test public void shouldSetParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setParallelism(42); @@ -78,7 +99,7 @@ public void shouldSetParallelismStreaming() { @Test public void shouldSetMaxParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setMaxParallelism(42); @@ -93,7 +114,7 @@ public void shouldSetMaxParallelismStreaming() { public void shouldInferParallelismFromEnvironmentBatch() throws IOException { String flinkConfDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -109,7 +130,7 @@ public void shouldInferParallelismFromEnvironmentBatch() throws IOException { public void shouldInferParallelismFromEnvironmentStreaming() throws IOException { String confDir = extractFlinkConfig(); - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -123,7 +144,7 @@ public void shouldInferParallelismFromEnvironmentStreaming() throws IOException @Test public void shouldFallbackToDefaultParallelismBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -135,7 +156,7 @@ public void shouldFallbackToDefaultParallelismBatch() { @Test public void shouldFallbackToDefaultParallelismStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); @@ -148,7 +169,7 @@ public void shouldFallbackToDefaultParallelismStreaming() { @Test public void useDefaultParallelismFromContextBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); ExecutionEnvironment bev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -160,7 +181,7 @@ public void useDefaultParallelismFromContextBatch() { @Test public void useDefaultParallelismFromContextStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); StreamExecutionEnvironment sev = @@ -173,7 +194,7 @@ public void useDefaultParallelismFromContextStreaming() { @Test public void shouldParsePortForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -185,7 +206,7 @@ public void shouldParsePortForRemoteEnvironmentBatch() { @Test public void shouldParsePortForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:1234"); @@ -198,7 +219,7 @@ public void shouldParsePortForRemoteEnvironmentStreaming() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -210,7 +231,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentBatch() { @Test public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host"); @@ -223,7 +244,7 @@ public void shouldAllowPortOmissionForRemoteEnvironmentStreaming() { @Test public void shouldTreatAutoAndEmptyHostTheSameBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); ExecutionEnvironment sev = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); @@ -237,7 +258,7 @@ public void shouldTreatAutoAndEmptyHostTheSameBatch() { @Test public void shouldTreatAutoAndEmptyHostTheSameStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); StreamExecutionEnvironment sev = @@ -253,7 +274,7 @@ public void shouldTreatAutoAndEmptyHostTheSameStreaming() { @Test public void shouldDetectMalformedPortBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -265,7 +286,7 @@ public void shouldDetectMalformedPortBatch() { @Test public void shouldDetectMalformedPortStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("host:p0rt"); @@ -277,7 +298,7 @@ public void shouldDetectMalformedPortStreaming() { @Test public void shouldSupportIPv4Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -291,7 +312,7 @@ public void shouldSupportIPv4Batch() { @Test public void shouldSupportIPv4Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("192.168.1.1:1234"); @@ -305,7 +326,7 @@ public void shouldSupportIPv4Streaming() { @Test public void shouldSupportIPv6Batch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -320,7 +341,7 @@ public void shouldSupportIPv6Batch() { @Test public void shouldSupportIPv6Streaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setFlinkMaster("[FE80:CD00:0000:0CDE:1257:0000:211E:729C]:1234"); @@ -336,7 +357,7 @@ public void shouldSupportIPv6Streaming() { @Test public void shouldRemoveHttpProtocolFromHostBatch() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -352,7 +373,7 @@ public void shouldRemoveHttpProtocolFromHostBatch() { @Test public void shouldRemoveHttpProtocolFromHostStreaming() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); for (String flinkMaster : @@ -376,7 +397,7 @@ private String extractFlinkConfig() throws IOException { @Test public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, shut down sources immediately - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(0L)); } @@ -384,7 +405,7 @@ public void shouldAutoSetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { // Checkpointing is enabled, never shut down sources - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(Long.MAX_VALUE)); @@ -393,7 +414,7 @@ public void shouldAutoSetIdleSourcesFlagWithCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { // Checkpointing disabled, accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); assertThat(options.getShutdownSourcesAfterIdleMs(), is(42L)); @@ -402,7 +423,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithoutCheckpointing() { @Test public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { // Checkpointing enable, still accept flag - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setCheckpointingInterval(1000L); options.setShutdownSourcesAfterIdleMs(42L); FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); @@ -412,7 +433,7 @@ public void shouldAcceptExplicitlySetIdleSourcesFlagWithCheckpointing() { @Test public void shouldSetSavepointRestoreForRemoteStreaming() { String path = "fakePath"; - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("host:80"); options.setSavepointPath(path); @@ -426,7 +447,7 @@ public void shouldSetSavepointRestoreForRemoteStreaming() { @Test public void shouldFailOnUnknownStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); options.setStateBackendStoragePath("/path"); @@ -439,7 +460,7 @@ public void shouldFailOnUnknownStateBackend() { @Test public void shouldFailOnNoStoragePathProvided() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("unknown"); @@ -451,7 +472,7 @@ public void shouldFailOnNoStoragePathProvided() { @Test public void shouldCreateFileSystemStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("fileSystem"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); @@ -464,7 +485,7 @@ public void shouldCreateFileSystemStateBackend() { @Test public void shouldCreateRocksDbStateBackend() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setStateBackend("rocksDB"); options.setStateBackendStoragePath(temporaryFolder.getRoot().toURI().toString()); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java index 8af13c4c7fe3..c312d3759270 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.core.Every.everyItem; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import java.io.ByteArrayOutputStream; import java.io.File; @@ -38,6 +39,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -68,13 +71,13 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.powermock.reflect.Whitebox; /** Tests for {@link FlinkPipelineExecutionEnvironment}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) @@ -82,9 +85,25 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Parameterized.Parameter + public boolean useDataStreamForBatch; + + @Parameterized.Parameters(name = "UseDataStreamForBatch = {0}") + public static Collection useDataStreamForBatchJobValues() { + return Arrays.asList(new Object[][] { + {false}, {true} + }); + } + + private FlinkPipelineOptions getDefaultPipelineOptions() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); + return options; + } + @Test public void shouldRecognizeAndTranslateStreamingPipeline() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("[auto]"); @@ -136,6 +155,8 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOEx @Test public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException { + // StreamingExecutionEnv does not support "collection" mode. + assumeFalse(useDataStreamForBatch); FlinkPipelineOptions options = testPreparingResourcesToStage("[collection]"); assertThat(options.getFilesToStage().size(), is(2)); @@ -152,7 +173,7 @@ public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOE @Test public void shouldUseDefaultTempLocationIfNoneSet() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); @@ -168,42 +189,31 @@ public void shouldUseDefaultTempLocationIfNoneSet() { @Test public void shouldUsePreparedFilesOnRemoteEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); - options.setRunner(TestFlinkRunner.class); - options.setFlinkMaster("clusterAddress"); - - FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); - - Pipeline pipeline = Pipeline.create(options); - flinkEnv.translate(pipeline); - - ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); - assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); - - List jarFiles = getJars(executionEnvironment); - - List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); - - assertThat(jarFiles, is(urlConvertedStagedFiles)); + shouldUsePreparedFilesOnRemoteStreamEnvironment(true); + shouldUsePreparedFilesOnRemoteStreamEnvironment(false); } - @Test - public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + public void shouldUsePreparedFilesOnRemoteStreamEnvironment(boolean streamingMode) throws Exception { + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster("clusterAddress"); - options.setStreaming(true); + options.setStreaming(streamingMode); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); Pipeline pipeline = Pipeline.create(options); flinkEnv.translate(pipeline); - StreamExecutionEnvironment streamExecutionEnvironment = - flinkEnv.getStreamExecutionEnvironment(); - assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); - - List jarFiles = getJars(streamExecutionEnvironment); + List jarFiles; + if (streamingMode || options.getUseDataStreamForBatch()) { + StreamExecutionEnvironment streamExecutionEnvironment = flinkEnv.getStreamExecutionEnvironment(); + assertThat(streamExecutionEnvironment, instanceOf(RemoteStreamEnvironment.class)); + jarFiles = getJars(streamExecutionEnvironment); + } else { + ExecutionEnvironment executionEnvironment = flinkEnv.getBatchExecutionEnvironment(); + assertThat(executionEnvironment, instanceOf(RemoteEnvironment.class)); + jarFiles = getJars(executionEnvironment); + } List urlConvertedStagedFiles = convertFilesToURLs(options.getFilesToStage()); @@ -214,7 +224,7 @@ public void shouldUsePreparedFilesOnRemoteStreamEnvironment() throws Exception { public void shouldUseTransformOverrides() { boolean[] testParameters = {true, false}; for (boolean streaming : testParameters) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(streaming); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -234,7 +244,7 @@ public void shouldUseTransformOverrides() { @Test public void shouldProvideParallelismToTransformOverrides() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setStreaming(true); options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -278,7 +288,7 @@ public boolean matches(Object actual) { @Test public void shouldUseStreamingTransformOverridesWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); // no explicit streaming mode set options.setRunner(FlinkRunner.class); FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options); @@ -303,7 +313,7 @@ public void shouldUseStreamingTransformOverridesWithUnboundedSources() { @Test public void testTranslationModeOverrideWithUnboundedSources() { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(false); @@ -319,7 +329,7 @@ public void testTranslationModeOverrideWithUnboundedSources() { public void testTranslationModeNoOverrideWithoutUnboundedSources() { boolean[] testArgs = new boolean[] {true, false}; for (boolean streaming : testArgs) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); @@ -408,7 +418,7 @@ private FlinkPipelineOptions testPreparingResourcesToStage( private FlinkPipelineOptions setPipelineOptions( String flinkMaster, String tempLocation, List filesToStage) { - FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + FlinkPipelineOptions options = getDefaultPipelineOptions(); options.setRunner(TestFlinkRunner.class); options.setFlinkMaster(flinkMaster); options.setTempLocation(tempLocation); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java index c2d9163aacc9..da8c560690a6 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -94,6 +94,7 @@ public void testDefaults() { assertThat(options.getMaxBundleSize(), is(1000L)); assertThat(options.getMaxBundleTimeMills(), is(1000L)); assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED.name())); + assertThat(options.getUseDataStreamForBatch(), is(false)); assertThat(options.getSavepointPath(), is(nullValue())); assertThat(options.getAllowNonRestoredState(), is(false)); assertThat(options.getDisableMetrics(), is(false)); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java index 849f8be952cb..f37773db82f5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslatorTest.java @@ -156,7 +156,7 @@ public void testStatefulParDoAfterCombineChaining() { private JobGraph getStatefulParDoAfterCombineChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); @@ -188,7 +188,7 @@ public void testStatefulParDoAfterGroupByKeyChaining() { private JobGraph getStatefulParDoAfterGroupByKeyChainingJobGraph(boolean stablePartitioning) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamingPipelineTranslator translator = - new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create()); + new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create(), true); final PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); pipelineOptions.setRunner(FlinkRunner.class); final Pipeline pipeline = Pipeline.create(pipelineOptions); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 45cfb30f9118..7fe2c954146c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -29,8 +29,8 @@ import java.util.Map; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.FlinkSource; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.source.bounded.FlinkBoundedSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -49,8 +49,8 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; @@ -76,11 +76,9 @@ public void readSourceTranslatorBoundedWithMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(maxParallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -96,11 +94,9 @@ public void readSourceTranslatorBoundedWithoutMaxParallelism() { Object sourceTransform = applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env); - UnboundedSourceWrapperNoValueWithRecordId source = - (UnboundedSourceWrapperNoValueWithRecordId) - ((LegacySourceTransformation) sourceTransform).getOperator().getUserFunction(); + FlinkBoundedSource source = (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); - assertEquals(parallelism, source.getUnderlyingSource().getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } @Test @@ -119,13 +115,11 @@ public void readSourceTranslatorUnboundedWithMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); - assertEquals(maxParallelism, source.getSplitSources().size()); + assertEquals(maxParallelism, source.getNumSplits()); } @Test @@ -142,13 +136,11 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { (OneInputTransformation) applyReadSourceTransform(transform, PCollection.IsBounded.UNBOUNDED, env); - UnboundedSourceWrapper source = - (UnboundedSourceWrapper) - ((LegacySourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())) - .getOperator() - .getUserFunction(); + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(sourceTransform.getInputs())).getSource(); - assertEquals(parallelism, source.getSplitSources().size()); + assertEquals(parallelism, source.getNumSplits()); } private Object applyReadSourceTransform( @@ -157,7 +149,7 @@ private Object applyReadSourceTransform( FlinkStreamingPipelineTranslator.StreamTransformTranslator> translator = getReadSourceTranslator(); FlinkStreamingTranslationContext ctx = - new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create()); + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); Pipeline pipeline = Pipeline.create(); PCollection pc = diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java index a2c04d88f713..7317a343c3ee 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSubmissionTest.java @@ -72,6 +72,8 @@ public class FlinkSubmissionTest { /** Counter which keeps track of the number of jobs submitted. */ private static int expectedNumberOfJobs; + public static boolean useDataStreamForBatch; + @BeforeClass public static void beforeClass() throws Exception { Configuration config = new Configuration(); @@ -104,6 +106,12 @@ public void testSubmissionBatch() throws Exception { runSubmission(false, false); } + @Test + public void testSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(false, false); + } + @Test public void testSubmissionStreaming() throws Exception { runSubmission(false, true); @@ -114,6 +122,12 @@ public void testDetachedSubmissionBatch() throws Exception { runSubmission(true, false); } + @Test + public void testDetachedSubmissionBatchUseDataStream() throws Exception { + FlinkSubmissionTest.useDataStreamForBatch = true; + runSubmission(true, false); + } + @Test public void testDetachedSubmissionStreaming() throws Exception { runSubmission(true, true); @@ -164,6 +178,7 @@ private void waitUntilJobIsCompleted() throws Exception { /** The Flink program which is executed by the CliFrontend. */ public static void main(String[] args) { FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + options.setUseDataStreamForBatch(useDataStreamForBatch); options.setRunner(FlinkRunner.class); options.setStreaming(streaming); options.setParallelism(1); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java index 2921065c1547..4db1dbaa32de 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java @@ -56,13 +56,18 @@ public void postSubmit() throws Exception { } @Test - public void testProgram() throws Exception { - runProgram(resultPath); + public void testStreaming() { + runProgram(resultPath, true); } - private static void runProgram(String resultPath) { + @Test + public void testBatch() { + runProgram(resultPath, false); + } + + private static void runProgram(String resultPath, boolean streaming) { - Pipeline p = FlinkTestPipeline.createForStreaming(); + Pipeline p = streaming ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch(); p.apply(GenerateSequence.from(0).to(10)) .apply(