From 06080e34d410e31dbee26be3d5a8ec6882d27266 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 22 Feb 2019 16:16:16 +0100 Subject: [PATCH] [BEAM-6733] Use Flink 1.6/1.7 prepareSnapshotPreBarrier to replace BufferedOutputManager For Flink version <= 1.5 the Flink Runner has to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. This leads to increased code complexity. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in DoFnOperator's prepareSnapshotPreBarrier. The 1.5/1.6/1.7 build setup allows us to make this change with as little code duplication as possible. --- runners/flink/1.6/build.gradle | 4 +- .../wrappers/streaming/DoFnOperator.java | 956 ++++++++++++++++++ runners/flink/1.7/build.gradle | 4 +- runners/flink/build.gradle | 4 +- .../wrappers/streaming/DoFnOperator.java | 0 .../state/FlinkSplitStateInternals.java | 0 .../FlinkSplitStateInternalsTest.java | 2 +- .../ExecutableStageDoFnOperator.java | 7 +- .../wrappers/streaming/DoFnOperatorTest.java | 164 ++- .../src/main/resources/beam/suppressions.xml | 7 + 10 files changed, 1095 insertions(+), 53 deletions(-) create mode 100644 runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java rename runners/flink/src/{ => 1.5}/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java (100%) rename runners/flink/src/{ => 1.5}/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java (100%) rename runners/flink/src/{test/java/org/apache/beam/runners/flink => 1.5/test/java/org/apache/beam/runners/flink/translation/wrapper}/streaming/FlinkSplitStateInternalsTest.java (97%) diff --git a/runners/flink/1.6/build.gradle b/runners/flink/1.6/build.gradle index 1792389e7dbf..4a0fbc707b44 100644 --- a/runners/flink/1.6/build.gradle +++ b/runners/flink/1.6/build.gradle @@ -21,9 +21,9 @@ def basePath = '..' /* All properties required for loading the Flink build script */ project.ext { // Set the version of all Flink-related dependencies here. - flink_version = '1.6.2' + flink_version = '1.6.3' // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] + main_source_dirs = ["$basePath/src/main/java", './src/main/java'] test_source_dirs = ["$basePath/src/test/java"] main_resources_dirs = ["$basePath/src/main/resources"] test_resources_dirs = ["$basePath/src/test/resources"] diff --git a/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java new file mode 100644 index 000000000000..0c6400539592 --- /dev/null +++ b/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -0,0 +1,956 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.ProcessFnRunner; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StatefulDoFnRunner; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import org.apache.beam.runners.flink.translation.utils.FlinkClassloading; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.OutputTag; +import org.joda.time.Instant; + +/** + * Flink operator for executing {@link DoFn DoFns}. + * + * @param the input type of the {@link DoFn} + * @param the output type of the {@link DoFn} + */ +public class DoFnOperator extends AbstractStreamOperator> + implements OneInputStreamOperator, WindowedValue>, + TwoInputStreamOperator, RawUnionValue, WindowedValue>, + Triggerable { + + protected DoFn doFn; + + protected final SerializablePipelineOptions serializedOptions; + + protected final TupleTag mainOutputTag; + protected final List> additionalOutputTags; + + protected final Collection> sideInputs; + protected final Map> sideInputTagMapping; + + protected final WindowingStrategy windowingStrategy; + + protected final OutputManagerFactory outputManagerFactory; + + protected transient DoFnRunner doFnRunner; + protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; + + protected transient SideInputHandler sideInputHandler; + + protected transient SideInputReader sideInputReader; + + protected transient FlinkOutputManager outputManager; + + private transient DoFnInvoker doFnInvoker; + + protected transient long currentInputWatermark; + + protected transient long currentSideInputWatermark; + + protected transient long currentOutputWatermark; + + protected transient FlinkStateInternals keyedStateInternals; + + protected final String stepName; + + private final Coder> windowedInputCoder; + + private final Coder inputCoder; + + private final Map, Coder> outputCoders; + + protected final Coder keyCoder; + + final KeySelector, ?> keySelector; + + private final TimerInternals.TimerDataCoder timerCoder; + + /** Max number of elements to include in a bundle. */ + private final long maxBundleSize; + /** Max duration of a bundle. */ + private final long maxBundleTimeMills; + + private final DoFnSchemaInformation doFnSchemaInformation; + + protected transient InternalTimerService timerService; + + protected transient FlinkTimerInternals timerInternals; + + private transient long pushedBackWatermark; + + private transient PushedBackElementsHandler> pushedBackElementsHandler; + + /** Use an AtomicBoolean because we start/stop bundles by a timer thread (see below). */ + private transient AtomicBoolean bundleStarted; + /** Number of processed elements in the current bundle. */ + private transient long elementCount; + /** A timer that finishes the current bundle after a fixed amount of time. */ + private transient ScheduledFuture checkFinishBundleTimer; + /** Time that the last bundle was finished (to set the timer). */ + private transient long lastFinishBundleTime; + + public DoFnOperator( + DoFn doFn, + String stepName, + Coder> inputWindowedCoder, + Coder inputCoder, + Map, Coder> outputCoders, + TupleTag mainOutputTag, + List> additionalOutputTags, + OutputManagerFactory outputManagerFactory, + WindowingStrategy windowingStrategy, + Map> sideInputTagMapping, + Collection> sideInputs, + PipelineOptions options, + Coder keyCoder, + KeySelector, ?> keySelector, + DoFnSchemaInformation doFnSchemaInformation) { + this.doFn = doFn; + this.stepName = stepName; + this.windowedInputCoder = inputWindowedCoder; + this.inputCoder = inputCoder; + this.outputCoders = outputCoders; + this.mainOutputTag = mainOutputTag; + this.additionalOutputTags = additionalOutputTags; + this.sideInputTagMapping = sideInputTagMapping; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializablePipelineOptions(options); + this.windowingStrategy = windowingStrategy; + this.outputManagerFactory = outputManagerFactory; + + setChainingStrategy(ChainingStrategy.ALWAYS); + + this.keyCoder = keyCoder; + this.keySelector = keySelector; + + this.timerCoder = + TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); + + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + + this.maxBundleSize = flinkOptions.getMaxBundleSize(); + this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills(); + this.doFnSchemaInformation = doFnSchemaInformation; + } + + // allow overriding this in WindowDoFnOperator because this one dynamically creates + // the DoFn + protected DoFn getDoFn() { + return doFn; + } + + // allow overriding this, for example SplittableDoFnOperator will not create a + // stateful DoFn runner because ProcessFn, which is used for executing a Splittable DoFn + // doesn't play by the normal DoFn rules and WindowDoFnOperator uses LateDataDroppingDoFnRunner + protected DoFnRunner createWrappingDoFnRunner( + DoFnRunner wrappedRunner) { + + if (keyCoder != null) { + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, windowingStrategy); + + // we don't know the window type + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + StatefulDoFnRunner.StateCleaner stateCleaner = + new StatefulDoFnRunner.StateInternalsStateCleaner<>( + doFn, keyedStateInternals, windowCoder); + + return DoFnRunners.defaultStatefulDoFnRunner( + doFn, wrappedRunner, windowingStrategy, cleanupTimer, stateCleaner); + + } else { + return doFnRunner; + } + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output>> output) { + + // make sure that FileSystems is initialized correctly + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + FileSystems.setDefaultPipelineOptions(options); + + super.setup(containingTask, config, output); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + ListStateDescriptor> pushedBackStateDescriptor = + new ListStateDescriptor<>( + "pushed-back-elements", new CoderTypeSerializer<>(windowedInputCoder)); + + if (keySelector != null) { + pushedBackElementsHandler = + KeyedPushedBackElementsHandler.create( + keySelector, getKeyedStateBackend(), pushedBackStateDescriptor); + } else { + ListState> listState = + getOperatorStateBackend().getListState(pushedBackStateDescriptor); + pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); + } + + setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + + sideInputReader = NullSideInputReader.of(sideInputs); + + if (!sideInputs.isEmpty()) { + + FlinkBroadcastStateInternals sideInputStateInternals = + new FlinkBroadcastStateInternals<>( + getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()); + + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + + Stream> pushedBack = pushedBackElementsHandler.getElements(); + long min = + pushedBack.map(v -> v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min); + setPushedBackWatermark(min); + } else { + setPushedBackWatermark(Long.MAX_VALUE); + } + + outputManager = outputManagerFactory.create(output); + + // StatefulPardo or WindowDoFn + if (keyCoder != null) { + keyedStateInternals = + new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), keyCoder); + + if (timerService == null) { + timerService = + getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this); + } + + timerInternals = new FlinkTimerInternals(); + } + } + + @Override + public void open() throws Exception { + // WindowDoFnOperator need use state and timer to get DoFn. + // So must wait StateInternals and TimerInternals ready. + // This will be called after initializeState() + this.doFn = getDoFn(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + doFnRunner = + DoFnRunners.simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + new FlinkStepContext(), + inputCoder, + outputCoders, + windowingStrategy, + doFnSchemaInformation); + + doFnRunner = createWrappingDoFnRunner(doFnRunner); + + if (options.getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + + bundleStarted = new AtomicBoolean(false); + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + + // Schedule timer to check timeout of finish bundle. + long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2; + checkFinishBundleTimer = + getProcessingTimeService() + .scheduleAtFixedRate( + timestamp -> checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod); + + if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { + pushbackDoFnRunner = + new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, sideInputHandler); + } else { + pushbackDoFnRunner = + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + } + } + + @Override + public void dispose() throws Exception { + try { + checkFinishBundleTimer.cancel(true); + FlinkClassloading.deleteStaticCaches(); + doFnInvoker.invokeTeardown(); + } finally { + // This releases all task's resources. We need to call this last + // to ensure that state, timers, or output buffers can still be + // accessed during finishing the bundle. + super.dispose(); + } + } + + @Override + public void close() throws Exception { + try { + // This is our last change to block shutdown of this operator while + // there are still remaining processing-time timers. Flink will ignore pending + // processing-time timers when upstream operators have shut down and will also + // shut down this operator with pending processing-time timers. + while (this.numProcessingTimeTimers() > 0) { + getContainingTask().getCheckpointLock().wait(100); + } + if (this.numProcessingTimeTimers() > 0) { + throw new RuntimeException( + "There are still processing-time timers left, this indicates a bug"); + } + + // make sure we send a +Inf watermark downstream. It can happen that we receive +Inf + // in processWatermark*() but have holds, so we have to re-evaluate here. + processWatermark(new Watermark(Long.MAX_VALUE)); + invokeFinishBundle(); + if (currentOutputWatermark < Long.MAX_VALUE) { + if (keyedStateInternals == null) { + throw new RuntimeException("Current watermark is still " + currentOutputWatermark + "."); + + } else { + throw new RuntimeException( + "There are still watermark holds. Watermark held at " + + keyedStateInternals.watermarkHold().getMillis() + + "."); + } + } + } finally { + super.close(); + } + + // sanity check: these should have been flushed out by +Inf watermarks + if (!sideInputs.isEmpty()) { + + List> pushedBackElements = + pushedBackElementsHandler.getElements().collect(Collectors.toList()); + + if (pushedBackElements.size() > 0) { + String pushedBackString = Joiner.on(",").join(pushedBackElements); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } + } + } + + protected long getPushbackWatermarkHold() { + return pushedBackWatermark; + } + + protected void setPushedBackWatermark(long watermark) { + pushedBackWatermark = watermark; + } + + @Override + public final void processElement(StreamRecord> streamRecord) { + checkInvokeStartBundle(); + doFnRunner.processElement(streamRecord.getValue()); + checkInvokeFinishBundleByCount(); + } + + @Override + public final void processElement1(StreamRecord> streamRecord) + throws Exception { + checkInvokeStartBundle(); + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue()); + + long min = pushedBackWatermark; + for (WindowedValue pushedBackValue : justPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + setPushedBackWatermark(min); + + checkInvokeFinishBundleByCount(); + } + + /** + * Add the side input value. Here we are assuming that views have already been materialized and + * are sent over the wire as {@link Iterable}. Subclasses may elect to perform materialization in + * state and receive side input incrementally instead. + * + * @param streamRecord + */ + protected void addSideInputValue(StreamRecord streamRecord) { + @SuppressWarnings("unchecked") + WindowedValue> value = + (WindowedValue>) streamRecord.getValue().getValue(); + + PCollectionView sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); + sideInputHandler.addSideInputValue(sideInput, value); + } + + @Override + public final void processElement2(StreamRecord streamRecord) throws Exception { + // we finish the bundle because the newly arrived side-input might + // make a view available that was previously not ready. + // The PushbackSideInputRunner will only reset it's cache of non-ready windows when + // finishing a bundle. + invokeFinishBundle(); + checkInvokeStartBundle(); + + // add the side input, which may cause pushed back elements become eligible for processing + addSideInputValue(streamRecord); + + List> newPushedBack = new ArrayList<>(); + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(element)); + + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(element); + Iterables.addAll(newPushedBack, justPushedBack); + } + + pushedBackElementsHandler.clear(); + long min = Long.MAX_VALUE; + for (WindowedValue pushedBackValue : newPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + setPushedBackWatermark(min); + + checkInvokeFinishBundleByCount(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + processWatermark1(mark); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + + checkInvokeStartBundle(); + + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } + + if (keyCoder == null) { + setCurrentInputWatermark(mark.getTimestamp()); + long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + setCurrentOutputWatermark(potentialOutputWatermark); + emitWatermark(currentOutputWatermark); + } + } else { + setCurrentInputWatermark(mark.getTimestamp()); + + // hold back by the pushed back values waiting for side inputs + long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + + timeServiceManager.advanceWatermark( + new Watermark(toFlinkRuntimeWatermark(pushedBackInputWatermark))); + + Instant watermarkHold = keyedStateInternals.watermarkHold(); + + long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); + + long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold); + + if (potentialOutputWatermark > currentOutputWatermark) { + setCurrentOutputWatermark(potentialOutputWatermark); + emitWatermark(currentOutputWatermark); + } + } + } + + private void emitWatermark(long watermark) { + // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events. + if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } + output.emitWatermark(new Watermark(watermark)); + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + checkInvokeStartBundle(); + + setCurrentSideInputWatermark(mark.getTimestamp()); + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + emitAllPushedBackData(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + } + + /** + * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what + * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements + * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be + * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting {@code + * 1} from a Beam watermark before passing to any relevant Flink runtime components. + */ + private static long toFlinkRuntimeWatermark(long beamWatermark) { + return beamWatermark - 1; + } + + /** + * Emits all pushed-back data. This should be used once we know that there will not be any future + * side input, i.e. that there is no point in waiting. + */ + private void emitAllPushedBackData() throws Exception { + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(element)); + + doFnRunner.processElement(element); + } + + pushedBackElementsHandler.clear(); + + setPushedBackWatermark(Long.MAX_VALUE); + } + + /** + * Check whether invoke startBundle, if it is, need to output elements that were buffered as part + * of finishing a bundle in snapshot() first. + * + *

In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or {@link + * DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not between StartBundle and + * FinishBundle, this method needs to be called in each processElement and each processWatermark + * and onProcessingTime. Do not need to call in onEventTime, because it has been guaranteed in the + * processWatermark. + */ + private void checkInvokeStartBundle() { + if (bundleStarted.compareAndSet(false, true)) { + pushbackDoFnRunner.startBundle(); + } + } + + /** Check whether invoke finishBundle by elements count. Called in processElement. */ + private void checkInvokeFinishBundleByCount() { + elementCount++; + if (elementCount >= maxBundleSize) { + invokeFinishBundle(); + } + } + + /** Check whether invoke finishBundle by timeout. */ + private void checkInvokeFinishBundleByTime() { + long now = getProcessingTimeService().getCurrentProcessingTime(); + if (now - lastFinishBundleTime >= maxBundleTimeMills) { + invokeFinishBundle(); + } + } + + protected final void invokeFinishBundle() { + if (bundleStarted.compareAndSet(true, false)) { + pushbackDoFnRunner.finishBundle(); + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + // Finish the current bundle before the snapshot barrier is sent downstream + // This give us a clean state before taking the actual snapshot + invokeFinishBundle(); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // We don't have to cal checkInvokeStartBundle() because it's already called in + // processWatermark*(). + fireTimer(timer); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + checkInvokeStartBundle(); + fireTimer(timer); + } + + // allow overriding this in WindowDoFnOperator + public void fireTimer(InternalTimer timer) { + TimerInternals.TimerData timerData = timer.getNamespace(); + StateNamespace namespace = timerData.getNamespace(); + // This is a user timer, so namespace must be WindowNamespace + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace) namespace).getWindow(); + timerInternals.cleanupPendingTimer(timer.getNamespace()); + pushbackDoFnRunner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } + + private void setCurrentInputWatermark(long currentInputWatermark) { + this.currentInputWatermark = currentInputWatermark; + } + + private void setCurrentSideInputWatermark(long currentInputWatermark) { + this.currentSideInputWatermark = currentInputWatermark; + } + + private void setCurrentOutputWatermark(long currentOutputWatermark) { + this.currentOutputWatermark = currentOutputWatermark; + } + + /** Factory for creating an {@link FlinkOutputManager} from a Flink {@link Output}. */ + interface OutputManagerFactory extends Serializable { + FlinkOutputManager create(Output>> output); + } + + /** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */ + public static class FlinkOutputManager implements DoFnRunners.OutputManager { + + private TupleTag mainTag; + private Map, OutputTag>> tagsToOutputTags; + private Map> idsToTags; + protected Output>> output; + + FlinkOutputManager( + Output>> output, + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + final Map, Coder>> tagsToCoders, + Map, Integer> tagsToIds) { + this.output = output; + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.idsToTags = new HashMap<>(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToTags.put(entry.getValue(), entry.getKey()); + } + + ImmutableMap.Builder>> idsToCodersBuilder = + ImmutableMap.builder(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey())); + } + } + + @Override + public void output(TupleTag tag, WindowedValue value) { + emit(tag, value); + } + + private void emit(TupleTag tag, WindowedValue value) { + if (tag.equals(mainTag)) { + // with tagged outputs we can't get around this because we don't + // know our own output type... + @SuppressWarnings("unchecked") + WindowedValue castValue = (WindowedValue) value; + output.collect(new StreamRecord<>(castValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag> outputTag = (OutputTag) tagsToOutputTags.get(tag); + output.collect(outputTag, new StreamRecord<>(value)); + } + } + } + + /** + * Implementation of {@link OutputManagerFactory} that creates an {@link FlinkOutputManager} that + * can write to multiple logical outputs by Flink side output. + */ + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory { + + private TupleTag mainTag; + private Map, Integer> tagsToIds; + private Map, OutputTag>> tagsToOutputTags; + private Map, Coder>> tagsToCoders; + + // There is no side output. + @SuppressWarnings("unchecked") + public MultiOutputOutputManagerFactory( + TupleTag mainTag, Coder> mainCoder) { + this( + mainTag, + new HashMap<>(), + ImmutableMap., Coder>>builder() + .put(mainTag, (Coder) mainCoder) + .build(), + ImmutableMap., Integer>builder().put(mainTag, 0).build()); + } + + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + Map, Coder>> tagsToCoders, + Map, Integer> tagsToIds) { + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToCoders = tagsToCoders; + this.tagsToIds = tagsToIds; + } + + @Override + public FlinkOutputManager create(Output>> output) { + return new FlinkOutputManager<>(output, mainTag, tagsToOutputTags, tagsToCoders, tagsToIds); + } + } + + /** + * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow accessing + * state or timer internals. + */ + protected class FlinkStepContext implements StepContext { + + @Override + public StateInternals stateInternals() { + return keyedStateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + } + + class FlinkTimerInternals implements TimerInternals { + + /** + * Pending Timers (=not been fired yet) by context id. The id is generated from the state + * namespace of the timer and the timer's id. Necessary for supporting removal of existing + * timers. In Flink removal of timers can only be done by providing id and time of the timer. + */ + final MapState pendingTimersById; + + private FlinkTimerInternals() { + MapStateDescriptor pendingTimersByIdStateDescriptor = + new MapStateDescriptor<>( + "pending-timers", new StringSerializer(), new CoderTypeSerializer<>(timerCoder)); + this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor); + } + + @Override + public void setTimer( + StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ + @Deprecated + @Override + public void setTimer(TimerData timer) { + try { + String contextTimerId = getContextTimerId(timer); + // Only one timer can exist at a time for a given timer id and context. + // If a timer gets set twice in the same context, the second must + // override the first. Thus, we must cancel any pending timers + // before we set the new one. + cancelPendingTimerById(contextTimerId); + registerTimer(timer, contextTimerId); + } catch (Exception e) { + throw new RuntimeException("Failed to set timer", e); + } + } + + private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + long time = timer.getTimestamp().getMillis(); + switch (timer.getDomain()) { + case EVENT_TIME: + timerService.registerEventTimeTimer(timer, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.registerProcessingTimeTimer(timer, time); + break; + default: + throw new UnsupportedOperationException("Unsupported time domain: " + timer.getDomain()); + } + pendingTimersById.put(contextTimerId, timer); + } + + private void cancelPendingTimerById(String contextTimerId) throws Exception { + TimerData oldTimer = pendingTimersById.get(contextTimerId); + if (oldTimer != null) { + deleteTimer(oldTimer); + } + } + + void cleanupPendingTimer(TimerData timer) { + try { + pendingTimersById.remove(getContextTimerId(timer)); + } catch (Exception e) { + throw new RuntimeException("Failed to cleanup state with pending timers", e); + } + } + + /** Unique contextual id of a timer. Used to look up any existing timers in a context. */ + private String getContextTimerId(TimerData timer) { + return timer.getTimerId() + timer.getNamespace().stringKey(); + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported."); + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated + @Override + public void deleteTimer(TimerData timerKey) { + cleanupPendingTimer(timerKey); + long time = timerKey.getTimestamp().getMillis(); + switch (timerKey.getDomain()) { + case EVENT_TIME: + timerService.deleteEventTimeTimer(timerKey, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.deleteProcessingTimeTimer(timerKey, time); + break; + default: + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); + } + } + + @Override + public Instant currentProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); + } + + @Nullable + @Override + public Instant currentOutputWatermarkTime() { + return new Instant(currentOutputWatermark); + } + } +} diff --git a/runners/flink/1.7/build.gradle b/runners/flink/1.7/build.gradle index 5982c1b2f841..68f5d56506e1 100644 --- a/runners/flink/1.7/build.gradle +++ b/runners/flink/1.7/build.gradle @@ -21,9 +21,9 @@ def basePath = '..' /* All properties required for loading the Flink build script */ project.ext { // Set the version of all Flink-related dependencies here. - flink_version = '1.7.1' + flink_version = '1.7.2' // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] + main_source_dirs = ["$basePath/src/main/java", "$basePath/1.6/src/main/java"] test_source_dirs = ["$basePath/src/test/java"] main_resources_dirs = ["$basePath/src/main/resources"] test_resources_dirs = ["$basePath/src/test/resources"] diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index 39addc398407..aad5e6ece362 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -21,8 +21,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.5.6' // Look for the source code in the current module - main_source_dirs = ['./src/main/java'] - test_source_dirs = ['./src/test/java'] + main_source_dirs = ['./src/main/java', './src/1.5/main/java'] + test_source_dirs = ['./src/test/java', './src/1.5/test/java'] main_resources_dirs = ['./src/main/resources'] test_resources_dirs = ['./src/test/resources'] } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java similarity index 100% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java rename to runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java similarity index 100% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java rename to runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java similarity index 97% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java rename to runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java index e1460956a708..7376db303b46 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java +++ b/runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrapper.streaming; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsTest; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 011cfc494702..0a688c7d08de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -40,6 +40,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -117,7 +118,7 @@ public class ExecutableStageDoFnOperator extends DoFnOperator> windowedInputCoder, @@ -505,7 +506,7 @@ private static class SdkHarnessDoFnRunner private final StageBundleFactory stageBundleFactory; private final StateRequestHandler stateRequestHandler; private final BundleProgressHandler progressHandler; - private final BufferedOutputManager outputManager; + private final DoFnRunners.OutputManager outputManager; private final Map> outputMap; /** Timer Output Pcollection id => TimerSpec. */ private final Map timerOutputIdToSpecMap; @@ -528,7 +529,7 @@ public SdkHarnessDoFnRunner( StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, - BufferedOutputManager outputManager, + DoFnRunners.OutputManager outputManager, Map> outputMap, Coder windowCoder, KeySelector, ?> keySelector, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 615669354a56..89c8cbab656e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.databind.util.LRUMap; +import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.Optional; @@ -1163,17 +1164,29 @@ public void finishBundle(FinishBundleContext context) { testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c"))); // draw a snapshot + boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator); OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // There is a finishBundle in snapshot() - // Elements will be buffered as part of finishing a bundle in snapshot() - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("a"), - WindowedValue.valueInGlobalWindow("b"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("c"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + // Elements will be buffered as part of finishing a bundle in snapshot() + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"))); + } testHarness.close(); @@ -1209,38 +1222,65 @@ public void finishBundle(FinishBundleContext context) { // check finishBundle by timeout newHarness.setProcessingTime(10); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } // A final bundle will be created when sending the MAX watermark newHarness.close(); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } // close() will also call dispose(), but call again to verify no new bundle // is created afterwards newDoFnOperator.dispose(); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } } @Test + @SuppressWarnings("unchecked") public void testBundleKeyed() throws Exception { StringUtf8Coder keyCoder = StringUtf8Coder.of(); @@ -1303,17 +1343,29 @@ public void finishBundle(FinishBundleContext context) { new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); // Take a snapshot + boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator); OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // There is a finishBundle in snapshot() - // Elements will be buffered as part of finishing a bundle in snapshot() - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow(KV.of("key", "a")), - WindowedValue.valueInGlobalWindow(KV.of("key", "b")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } else { + // Elements will be buffered as part of finishing a bundle in snapshot() + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + } testHarness.close(); @@ -1351,13 +1403,21 @@ public void finishBundle(FinishBundleContext context) { // check finishBundle by timeout testHarness.setProcessingTime(10); - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "d")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "d")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + // The first finishBundle is restored from the checkpoint + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "d")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } testHarness.close(); } @@ -1479,4 +1539,22 @@ private WindowedValue valueInWindow(T value, Instant timestamp, BoundedWi private interface TestHarnessFactory { T create() throws Exception; } + + /** + * Flink version >=1.6 provides a hook for performing an action before the snapshot barrier is + * emitted to downstream operators. This avoids buffering elements emitted during finalizing the + * bundle in the snapshot method. + */ + private static boolean callPrepareSnapshotPreBarrier(DoFnOperator doFnOperator) throws Exception { + Method prepareSnapshotPreBarrier; + try { + prepareSnapshotPreBarrier = + doFnOperator.getClass().getMethod("prepareSnapshotPreBarrier", long.class); + prepareSnapshotPreBarrier.invoke(doFnOperator, 0L); + return true; + } catch (NoSuchMethodException e) { + // that's ok. not supported in this Flink version. + return false; + } + } } diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index 247a980242e2..008bfb5f208b 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -102,4 +102,11 @@ + + + + + + +