diff --git a/runners/flink/1.6/build.gradle b/runners/flink/1.6/build.gradle index 1792389e7dbf..4a0fbc707b44 100644 --- a/runners/flink/1.6/build.gradle +++ b/runners/flink/1.6/build.gradle @@ -21,9 +21,9 @@ def basePath = '..' /* All properties required for loading the Flink build script */ project.ext { // Set the version of all Flink-related dependencies here. - flink_version = '1.6.2' + flink_version = '1.6.3' // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] + main_source_dirs = ["$basePath/src/main/java", './src/main/java'] test_source_dirs = ["$basePath/src/test/java"] main_resources_dirs = ["$basePath/src/main/resources"] test_resources_dirs = ["$basePath/src/test/resources"] diff --git a/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java new file mode 100644 index 000000000000..0c6400539592 --- /dev/null +++ b/runners/flink/1.6/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -0,0 +1,956 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.ProcessFnRunner; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StatefulDoFnRunner; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import org.apache.beam.runners.flink.translation.utils.FlinkClassloading; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.OutputTag; +import org.joda.time.Instant; + +/** + * Flink operator for executing {@link DoFn DoFns}. + * + * @param the input type of the {@link DoFn} + * @param the output type of the {@link DoFn} + */ +public class DoFnOperator extends AbstractStreamOperator> + implements OneInputStreamOperator, WindowedValue>, + TwoInputStreamOperator, RawUnionValue, WindowedValue>, + Triggerable { + + protected DoFn doFn; + + protected final SerializablePipelineOptions serializedOptions; + + protected final TupleTag mainOutputTag; + protected final List> additionalOutputTags; + + protected final Collection> sideInputs; + protected final Map> sideInputTagMapping; + + protected final WindowingStrategy windowingStrategy; + + protected final OutputManagerFactory outputManagerFactory; + + protected transient DoFnRunner doFnRunner; + protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; + + protected transient SideInputHandler sideInputHandler; + + protected transient SideInputReader sideInputReader; + + protected transient FlinkOutputManager outputManager; + + private transient DoFnInvoker doFnInvoker; + + protected transient long currentInputWatermark; + + protected transient long currentSideInputWatermark; + + protected transient long currentOutputWatermark; + + protected transient FlinkStateInternals keyedStateInternals; + + protected final String stepName; + + private final Coder> windowedInputCoder; + + private final Coder inputCoder; + + private final Map, Coder> outputCoders; + + protected final Coder keyCoder; + + final KeySelector, ?> keySelector; + + private final TimerInternals.TimerDataCoder timerCoder; + + /** Max number of elements to include in a bundle. */ + private final long maxBundleSize; + /** Max duration of a bundle. */ + private final long maxBundleTimeMills; + + private final DoFnSchemaInformation doFnSchemaInformation; + + protected transient InternalTimerService timerService; + + protected transient FlinkTimerInternals timerInternals; + + private transient long pushedBackWatermark; + + private transient PushedBackElementsHandler> pushedBackElementsHandler; + + /** Use an AtomicBoolean because we start/stop bundles by a timer thread (see below). */ + private transient AtomicBoolean bundleStarted; + /** Number of processed elements in the current bundle. */ + private transient long elementCount; + /** A timer that finishes the current bundle after a fixed amount of time. */ + private transient ScheduledFuture checkFinishBundleTimer; + /** Time that the last bundle was finished (to set the timer). */ + private transient long lastFinishBundleTime; + + public DoFnOperator( + DoFn doFn, + String stepName, + Coder> inputWindowedCoder, + Coder inputCoder, + Map, Coder> outputCoders, + TupleTag mainOutputTag, + List> additionalOutputTags, + OutputManagerFactory outputManagerFactory, + WindowingStrategy windowingStrategy, + Map> sideInputTagMapping, + Collection> sideInputs, + PipelineOptions options, + Coder keyCoder, + KeySelector, ?> keySelector, + DoFnSchemaInformation doFnSchemaInformation) { + this.doFn = doFn; + this.stepName = stepName; + this.windowedInputCoder = inputWindowedCoder; + this.inputCoder = inputCoder; + this.outputCoders = outputCoders; + this.mainOutputTag = mainOutputTag; + this.additionalOutputTags = additionalOutputTags; + this.sideInputTagMapping = sideInputTagMapping; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializablePipelineOptions(options); + this.windowingStrategy = windowingStrategy; + this.outputManagerFactory = outputManagerFactory; + + setChainingStrategy(ChainingStrategy.ALWAYS); + + this.keyCoder = keyCoder; + this.keySelector = keySelector; + + this.timerCoder = + TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); + + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + + this.maxBundleSize = flinkOptions.getMaxBundleSize(); + this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills(); + this.doFnSchemaInformation = doFnSchemaInformation; + } + + // allow overriding this in WindowDoFnOperator because this one dynamically creates + // the DoFn + protected DoFn getDoFn() { + return doFn; + } + + // allow overriding this, for example SplittableDoFnOperator will not create a + // stateful DoFn runner because ProcessFn, which is used for executing a Splittable DoFn + // doesn't play by the normal DoFn rules and WindowDoFnOperator uses LateDataDroppingDoFnRunner + protected DoFnRunner createWrappingDoFnRunner( + DoFnRunner wrappedRunner) { + + if (keyCoder != null) { + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, windowingStrategy); + + // we don't know the window type + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + StatefulDoFnRunner.StateCleaner stateCleaner = + new StatefulDoFnRunner.StateInternalsStateCleaner<>( + doFn, keyedStateInternals, windowCoder); + + return DoFnRunners.defaultStatefulDoFnRunner( + doFn, wrappedRunner, windowingStrategy, cleanupTimer, stateCleaner); + + } else { + return doFnRunner; + } + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output>> output) { + + // make sure that FileSystems is initialized correctly + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + FileSystems.setDefaultPipelineOptions(options); + + super.setup(containingTask, config, output); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + ListStateDescriptor> pushedBackStateDescriptor = + new ListStateDescriptor<>( + "pushed-back-elements", new CoderTypeSerializer<>(windowedInputCoder)); + + if (keySelector != null) { + pushedBackElementsHandler = + KeyedPushedBackElementsHandler.create( + keySelector, getKeyedStateBackend(), pushedBackStateDescriptor); + } else { + ListState> listState = + getOperatorStateBackend().getListState(pushedBackStateDescriptor); + pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); + } + + setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); + + sideInputReader = NullSideInputReader.of(sideInputs); + + if (!sideInputs.isEmpty()) { + + FlinkBroadcastStateInternals sideInputStateInternals = + new FlinkBroadcastStateInternals<>( + getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()); + + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + + Stream> pushedBack = pushedBackElementsHandler.getElements(); + long min = + pushedBack.map(v -> v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min); + setPushedBackWatermark(min); + } else { + setPushedBackWatermark(Long.MAX_VALUE); + } + + outputManager = outputManagerFactory.create(output); + + // StatefulPardo or WindowDoFn + if (keyCoder != null) { + keyedStateInternals = + new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(), keyCoder); + + if (timerService == null) { + timerService = + getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this); + } + + timerInternals = new FlinkTimerInternals(); + } + } + + @Override + public void open() throws Exception { + // WindowDoFnOperator need use state and timer to get DoFn. + // So must wait StateInternals and TimerInternals ready. + // This will be called after initializeState() + this.doFn = getDoFn(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); + + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + doFnRunner = + DoFnRunners.simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + new FlinkStepContext(), + inputCoder, + outputCoders, + windowingStrategy, + doFnSchemaInformation); + + doFnRunner = createWrappingDoFnRunner(doFnRunner); + + if (options.getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + + bundleStarted = new AtomicBoolean(false); + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + + // Schedule timer to check timeout of finish bundle. + long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2; + checkFinishBundleTimer = + getProcessingTimeService() + .scheduleAtFixedRate( + timestamp -> checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod); + + if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { + pushbackDoFnRunner = + new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, sideInputHandler); + } else { + pushbackDoFnRunner = + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + } + } + + @Override + public void dispose() throws Exception { + try { + checkFinishBundleTimer.cancel(true); + FlinkClassloading.deleteStaticCaches(); + doFnInvoker.invokeTeardown(); + } finally { + // This releases all task's resources. We need to call this last + // to ensure that state, timers, or output buffers can still be + // accessed during finishing the bundle. + super.dispose(); + } + } + + @Override + public void close() throws Exception { + try { + // This is our last change to block shutdown of this operator while + // there are still remaining processing-time timers. Flink will ignore pending + // processing-time timers when upstream operators have shut down and will also + // shut down this operator with pending processing-time timers. + while (this.numProcessingTimeTimers() > 0) { + getContainingTask().getCheckpointLock().wait(100); + } + if (this.numProcessingTimeTimers() > 0) { + throw new RuntimeException( + "There are still processing-time timers left, this indicates a bug"); + } + + // make sure we send a +Inf watermark downstream. It can happen that we receive +Inf + // in processWatermark*() but have holds, so we have to re-evaluate here. + processWatermark(new Watermark(Long.MAX_VALUE)); + invokeFinishBundle(); + if (currentOutputWatermark < Long.MAX_VALUE) { + if (keyedStateInternals == null) { + throw new RuntimeException("Current watermark is still " + currentOutputWatermark + "."); + + } else { + throw new RuntimeException( + "There are still watermark holds. Watermark held at " + + keyedStateInternals.watermarkHold().getMillis() + + "."); + } + } + } finally { + super.close(); + } + + // sanity check: these should have been flushed out by +Inf watermarks + if (!sideInputs.isEmpty()) { + + List> pushedBackElements = + pushedBackElementsHandler.getElements().collect(Collectors.toList()); + + if (pushedBackElements.size() > 0) { + String pushedBackString = Joiner.on(",").join(pushedBackElements); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } + } + } + + protected long getPushbackWatermarkHold() { + return pushedBackWatermark; + } + + protected void setPushedBackWatermark(long watermark) { + pushedBackWatermark = watermark; + } + + @Override + public final void processElement(StreamRecord> streamRecord) { + checkInvokeStartBundle(); + doFnRunner.processElement(streamRecord.getValue()); + checkInvokeFinishBundleByCount(); + } + + @Override + public final void processElement1(StreamRecord> streamRecord) + throws Exception { + checkInvokeStartBundle(); + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue()); + + long min = pushedBackWatermark; + for (WindowedValue pushedBackValue : justPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + setPushedBackWatermark(min); + + checkInvokeFinishBundleByCount(); + } + + /** + * Add the side input value. Here we are assuming that views have already been materialized and + * are sent over the wire as {@link Iterable}. Subclasses may elect to perform materialization in + * state and receive side input incrementally instead. + * + * @param streamRecord + */ + protected void addSideInputValue(StreamRecord streamRecord) { + @SuppressWarnings("unchecked") + WindowedValue> value = + (WindowedValue>) streamRecord.getValue().getValue(); + + PCollectionView sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); + sideInputHandler.addSideInputValue(sideInput, value); + } + + @Override + public final void processElement2(StreamRecord streamRecord) throws Exception { + // we finish the bundle because the newly arrived side-input might + // make a view available that was previously not ready. + // The PushbackSideInputRunner will only reset it's cache of non-ready windows when + // finishing a bundle. + invokeFinishBundle(); + checkInvokeStartBundle(); + + // add the side input, which may cause pushed back elements become eligible for processing + addSideInputValue(streamRecord); + + List> newPushedBack = new ArrayList<>(); + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(element)); + + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(element); + Iterables.addAll(newPushedBack, justPushedBack); + } + + pushedBackElementsHandler.clear(); + long min = Long.MAX_VALUE; + for (WindowedValue pushedBackValue : newPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + setPushedBackWatermark(min); + + checkInvokeFinishBundleByCount(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + processWatermark1(mark); + } + + @Override + public void processWatermark1(Watermark mark) throws Exception { + + checkInvokeStartBundle(); + + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } + + if (keyCoder == null) { + setCurrentInputWatermark(mark.getTimestamp()); + long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + setCurrentOutputWatermark(potentialOutputWatermark); + emitWatermark(currentOutputWatermark); + } + } else { + setCurrentInputWatermark(mark.getTimestamp()); + + // hold back by the pushed back values waiting for side inputs + long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + + timeServiceManager.advanceWatermark( + new Watermark(toFlinkRuntimeWatermark(pushedBackInputWatermark))); + + Instant watermarkHold = keyedStateInternals.watermarkHold(); + + long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); + + long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold); + + if (potentialOutputWatermark > currentOutputWatermark) { + setCurrentOutputWatermark(potentialOutputWatermark); + emitWatermark(currentOutputWatermark); + } + } + } + + private void emitWatermark(long watermark) { + // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events. + if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } + output.emitWatermark(new Watermark(watermark)); + } + + @Override + public void processWatermark2(Watermark mark) throws Exception { + checkInvokeStartBundle(); + + setCurrentSideInputWatermark(mark.getTimestamp()); + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + emitAllPushedBackData(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + } + + /** + * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what + * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements + * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be + * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting {@code + * 1} from a Beam watermark before passing to any relevant Flink runtime components. + */ + private static long toFlinkRuntimeWatermark(long beamWatermark) { + return beamWatermark - 1; + } + + /** + * Emits all pushed-back data. This should be used once we know that there will not be any future + * side input, i.e. that there is no point in waiting. + */ + private void emitAllPushedBackData() throws Exception { + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(element)); + + doFnRunner.processElement(element); + } + + pushedBackElementsHandler.clear(); + + setPushedBackWatermark(Long.MAX_VALUE); + } + + /** + * Check whether invoke startBundle, if it is, need to output elements that were buffered as part + * of finishing a bundle in snapshot() first. + * + *

In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or {@link + * DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not between StartBundle and + * FinishBundle, this method needs to be called in each processElement and each processWatermark + * and onProcessingTime. Do not need to call in onEventTime, because it has been guaranteed in the + * processWatermark. + */ + private void checkInvokeStartBundle() { + if (bundleStarted.compareAndSet(false, true)) { + pushbackDoFnRunner.startBundle(); + } + } + + /** Check whether invoke finishBundle by elements count. Called in processElement. */ + private void checkInvokeFinishBundleByCount() { + elementCount++; + if (elementCount >= maxBundleSize) { + invokeFinishBundle(); + } + } + + /** Check whether invoke finishBundle by timeout. */ + private void checkInvokeFinishBundleByTime() { + long now = getProcessingTimeService().getCurrentProcessingTime(); + if (now - lastFinishBundleTime >= maxBundleTimeMills) { + invokeFinishBundle(); + } + } + + protected final void invokeFinishBundle() { + if (bundleStarted.compareAndSet(true, false)) { + pushbackDoFnRunner.finishBundle(); + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + // Finish the current bundle before the snapshot barrier is sent downstream + // This give us a clean state before taking the actual snapshot + invokeFinishBundle(); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception { + // We don't have to cal checkInvokeStartBundle() because it's already called in + // processWatermark*(). + fireTimer(timer); + } + + @Override + public void onProcessingTime(InternalTimer timer) throws Exception { + checkInvokeStartBundle(); + fireTimer(timer); + } + + // allow overriding this in WindowDoFnOperator + public void fireTimer(InternalTimer timer) { + TimerInternals.TimerData timerData = timer.getNamespace(); + StateNamespace namespace = timerData.getNamespace(); + // This is a user timer, so namespace must be WindowNamespace + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace) namespace).getWindow(); + timerInternals.cleanupPendingTimer(timer.getNamespace()); + pushbackDoFnRunner.onTimer( + timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain()); + } + + private void setCurrentInputWatermark(long currentInputWatermark) { + this.currentInputWatermark = currentInputWatermark; + } + + private void setCurrentSideInputWatermark(long currentInputWatermark) { + this.currentSideInputWatermark = currentInputWatermark; + } + + private void setCurrentOutputWatermark(long currentOutputWatermark) { + this.currentOutputWatermark = currentOutputWatermark; + } + + /** Factory for creating an {@link FlinkOutputManager} from a Flink {@link Output}. */ + interface OutputManagerFactory extends Serializable { + FlinkOutputManager create(Output>> output); + } + + /** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */ + public static class FlinkOutputManager implements DoFnRunners.OutputManager { + + private TupleTag mainTag; + private Map, OutputTag>> tagsToOutputTags; + private Map> idsToTags; + protected Output>> output; + + FlinkOutputManager( + Output>> output, + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + final Map, Coder>> tagsToCoders, + Map, Integer> tagsToIds) { + this.output = output; + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.idsToTags = new HashMap<>(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToTags.put(entry.getValue(), entry.getKey()); + } + + ImmutableMap.Builder>> idsToCodersBuilder = + ImmutableMap.builder(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey())); + } + } + + @Override + public void output(TupleTag tag, WindowedValue value) { + emit(tag, value); + } + + private void emit(TupleTag tag, WindowedValue value) { + if (tag.equals(mainTag)) { + // with tagged outputs we can't get around this because we don't + // know our own output type... + @SuppressWarnings("unchecked") + WindowedValue castValue = (WindowedValue) value; + output.collect(new StreamRecord<>(castValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag> outputTag = (OutputTag) tagsToOutputTags.get(tag); + output.collect(outputTag, new StreamRecord<>(value)); + } + } + } + + /** + * Implementation of {@link OutputManagerFactory} that creates an {@link FlinkOutputManager} that + * can write to multiple logical outputs by Flink side output. + */ + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory { + + private TupleTag mainTag; + private Map, Integer> tagsToIds; + private Map, OutputTag>> tagsToOutputTags; + private Map, Coder>> tagsToCoders; + + // There is no side output. + @SuppressWarnings("unchecked") + public MultiOutputOutputManagerFactory( + TupleTag mainTag, Coder> mainCoder) { + this( + mainTag, + new HashMap<>(), + ImmutableMap., Coder>>builder() + .put(mainTag, (Coder) mainCoder) + .build(), + ImmutableMap., Integer>builder().put(mainTag, 0).build()); + } + + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + Map, Coder>> tagsToCoders, + Map, Integer> tagsToIds) { + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToCoders = tagsToCoders; + this.tagsToIds = tagsToIds; + } + + @Override + public FlinkOutputManager create(Output>> output) { + return new FlinkOutputManager<>(output, mainTag, tagsToOutputTags, tagsToCoders, tagsToIds); + } + } + + /** + * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow accessing + * state or timer internals. + */ + protected class FlinkStepContext implements StepContext { + + @Override + public StateInternals stateInternals() { + return keyedStateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + } + + class FlinkTimerInternals implements TimerInternals { + + /** + * Pending Timers (=not been fired yet) by context id. The id is generated from the state + * namespace of the timer and the timer's id. Necessary for supporting removal of existing + * timers. In Flink removal of timers can only be done by providing id and time of the timer. + */ + final MapState pendingTimersById; + + private FlinkTimerInternals() { + MapStateDescriptor pendingTimersByIdStateDescriptor = + new MapStateDescriptor<>( + "pending-timers", new StringSerializer(), new CoderTypeSerializer<>(timerCoder)); + this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor); + } + + @Override + public void setTimer( + StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); + } + + /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ + @Deprecated + @Override + public void setTimer(TimerData timer) { + try { + String contextTimerId = getContextTimerId(timer); + // Only one timer can exist at a time for a given timer id and context. + // If a timer gets set twice in the same context, the second must + // override the first. Thus, we must cancel any pending timers + // before we set the new one. + cancelPendingTimerById(contextTimerId); + registerTimer(timer, contextTimerId); + } catch (Exception e) { + throw new RuntimeException("Failed to set timer", e); + } + } + + private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + long time = timer.getTimestamp().getMillis(); + switch (timer.getDomain()) { + case EVENT_TIME: + timerService.registerEventTimeTimer(timer, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.registerProcessingTimeTimer(timer, time); + break; + default: + throw new UnsupportedOperationException("Unsupported time domain: " + timer.getDomain()); + } + pendingTimersById.put(contextTimerId, timer); + } + + private void cancelPendingTimerById(String contextTimerId) throws Exception { + TimerData oldTimer = pendingTimersById.get(contextTimerId); + if (oldTimer != null) { + deleteTimer(oldTimer); + } + } + + void cleanupPendingTimer(TimerData timer) { + try { + pendingTimersById.remove(getContextTimerId(timer)); + } catch (Exception e) { + throw new RuntimeException("Failed to cleanup state with pending timers", e); + } + } + + /** Unique contextual id of a timer. Used to look up any existing timers in a context. */ + private String getContextTimerId(TimerData timer) { + return timer.getTimerId() + timer.getNamespace().stringKey(); + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported."); + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ + @Deprecated + @Override + public void deleteTimer(TimerData timerKey) { + cleanupPendingTimer(timerKey); + long time = timerKey.getTimestamp().getMillis(); + switch (timerKey.getDomain()) { + case EVENT_TIME: + timerService.deleteEventTimeTimer(timerKey, time); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.deleteProcessingTimeTimer(timerKey, time); + break; + default: + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); + } + } + + @Override + public Instant currentProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Nullable + @Override + public Instant currentSynchronizedProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Override + public Instant currentInputWatermarkTime() { + return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold())); + } + + @Nullable + @Override + public Instant currentOutputWatermarkTime() { + return new Instant(currentOutputWatermark); + } + } +} diff --git a/runners/flink/1.7/build.gradle b/runners/flink/1.7/build.gradle index 5982c1b2f841..68f5d56506e1 100644 --- a/runners/flink/1.7/build.gradle +++ b/runners/flink/1.7/build.gradle @@ -21,9 +21,9 @@ def basePath = '..' /* All properties required for loading the Flink build script */ project.ext { // Set the version of all Flink-related dependencies here. - flink_version = '1.7.1' + flink_version = '1.7.2' // Look for the source code in the parent module - main_source_dirs = ["$basePath/src/main/java"] + main_source_dirs = ["$basePath/src/main/java", "$basePath/1.6/src/main/java"] test_source_dirs = ["$basePath/src/test/java"] main_resources_dirs = ["$basePath/src/main/resources"] test_resources_dirs = ["$basePath/src/test/resources"] diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index 39addc398407..aad5e6ece362 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -21,8 +21,8 @@ project.ext { // Set the version of all Flink-related dependencies here. flink_version = '1.5.6' // Look for the source code in the current module - main_source_dirs = ['./src/main/java'] - test_source_dirs = ['./src/test/java'] + main_source_dirs = ['./src/main/java', './src/1.5/main/java'] + test_source_dirs = ['./src/test/java', './src/1.5/test/java'] main_resources_dirs = ['./src/main/resources'] test_resources_dirs = ['./src/test/resources'] } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java similarity index 100% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java rename to runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java similarity index 100% rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java rename to runners/flink/src/1.5/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java b/runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java similarity index 97% rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java rename to runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java index e1460956a708..7376db303b46 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkSplitStateInternalsTest.java +++ b/runners/flink/src/1.5/test/java/org/apache/beam/runners/flink/translation/wrapper/streaming/FlinkSplitStateInternalsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.flink.streaming; +package org.apache.beam.runners.flink.translation.wrapper.streaming; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsTest; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 011cfc494702..0a688c7d08de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -40,6 +40,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -117,7 +118,7 @@ public class ExecutableStageDoFnOperator extends DoFnOperator> windowedInputCoder, @@ -505,7 +506,7 @@ private static class SdkHarnessDoFnRunner private final StageBundleFactory stageBundleFactory; private final StateRequestHandler stateRequestHandler; private final BundleProgressHandler progressHandler; - private final BufferedOutputManager outputManager; + private final DoFnRunners.OutputManager outputManager; private final Map> outputMap; /** Timer Output Pcollection id => TimerSpec. */ private final Map timerOutputIdToSpecMap; @@ -528,7 +529,7 @@ public SdkHarnessDoFnRunner( StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, - BufferedOutputManager outputManager, + DoFnRunners.OutputManager outputManager, Map> outputMap, Coder windowCoder, KeySelector, ?> keySelector, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 615669354a56..89c8cbab656e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.databind.util.LRUMap; +import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.Optional; @@ -1163,17 +1164,29 @@ public void finishBundle(FinishBundleContext context) { testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c"))); // draw a snapshot + boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator); OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // There is a finishBundle in snapshot() - // Elements will be buffered as part of finishing a bundle in snapshot() - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("a"), - WindowedValue.valueInGlobalWindow("b"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("c"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + // Elements will be buffered as part of finishing a bundle in snapshot() + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"))); + } testHarness.close(); @@ -1209,38 +1222,65 @@ public void finishBundle(FinishBundleContext context) { // check finishBundle by timeout newHarness.setProcessingTime(10); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } // A final bundle will be created when sending the MAX watermark newHarness.close(); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } // close() will also call dispose(), but call again to verify no new bundle // is created afterwards newDoFnOperator.dispose(); - assertThat( - stripStreamRecordFromWindowedValue(newHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("d"), - WindowedValue.valueInGlobalWindow("finishBundle"), - WindowedValue.valueInGlobalWindow("finishBundle"))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(newHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + } } @Test + @SuppressWarnings("unchecked") public void testBundleKeyed() throws Exception { StringUtf8Coder keyCoder = StringUtf8Coder.of(); @@ -1303,17 +1343,29 @@ public void finishBundle(FinishBundleContext context) { new StreamRecord(WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); // Take a snapshot + boolean bundleFinishedBeforeSnapshot = callPrepareSnapshotPreBarrier(doFnOperator); OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // There is a finishBundle in snapshot() - // Elements will be buffered as part of finishing a bundle in snapshot() - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - WindowedValue.valueInGlobalWindow(KV.of("key", "a")), - WindowedValue.valueInGlobalWindow(KV.of("key", "b")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } else { + // Elements will be buffered as part of finishing a bundle in snapshot() + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + } testHarness.close(); @@ -1351,13 +1403,21 @@ public void finishBundle(FinishBundleContext context) { // check finishBundle by timeout testHarness.setProcessingTime(10); - assertThat( - stripStreamRecordFromWindowedValue(testHarness.getOutput()), - contains( - // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "d")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + if (bundleFinishedBeforeSnapshot) { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "d")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } else { + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + // The first finishBundle is restored from the checkpoint + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "d")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + } testHarness.close(); } @@ -1479,4 +1539,22 @@ private WindowedValue valueInWindow(T value, Instant timestamp, BoundedWi private interface TestHarnessFactory { T create() throws Exception; } + + /** + * Flink version >=1.6 provides a hook for performing an action before the snapshot barrier is + * emitted to downstream operators. This avoids buffering elements emitted during finalizing the + * bundle in the snapshot method. + */ + private static boolean callPrepareSnapshotPreBarrier(DoFnOperator doFnOperator) throws Exception { + Method prepareSnapshotPreBarrier; + try { + prepareSnapshotPreBarrier = + doFnOperator.getClass().getMethod("prepareSnapshotPreBarrier", long.class); + prepareSnapshotPreBarrier.invoke(doFnOperator, 0L); + return true; + } catch (NoSuchMethodException e) { + // that's ok. not supported in this Flink version. + return false; + } + } } diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index 247a980242e2..008bfb5f208b 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -102,4 +102,11 @@ + + + + + + +