diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 6fd5e85a4e12..baf17532c97b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -58,7 +57,6 @@ import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.utils.FlinkClassloading; -import org.apache.beam.runners.flink.translation.utils.NoopLock; import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; @@ -146,7 +144,7 @@ public class DoFnOperator extends AbstractStreamOperator outputManager; + protected transient FlinkOutputManager outputManager; private transient DoFnInvoker doFnInvoker; @@ -385,19 +383,7 @@ public void initializeState(StateInitializationContext context) throws Exception outputManager = outputManagerFactory.create( - output, - getLockToAcquireForStateAccessDuringBundles(), - getOperatorStateBackend(), - getKeyedStateBackend(), - keySelector); - } - - /** - * Subclasses may provide a lock to ensure that the state backend is not accessed concurrently - * during bundle execution. - */ - protected Lock getLockToAcquireForStateAccessDuringBundles() { - return NoopLock.get(); + output, getOperatorStateBackend(), getKeyedStateBackend(), keySelector); } @Override @@ -719,7 +705,6 @@ private void emitAllPushedBackData() throws Exception { */ private void checkInvokeStartBundle() { if (bundleStarted.compareAndSet(false, true)) { - outputManager.flushBuffer(); pushbackDoFnRunner.startBundle(); } } @@ -754,6 +739,16 @@ protected final void invokeFinishBundle() { } } + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + // Finish the current bundle before the snapshot barrier is sent downstream. This give us a + // clean state before taking the actual snapshot. + // Ensure that no new bundle gets started as part of finishing a bundle. + while (bundleStarted.get()) { + invokeFinishBundle(); + } + } + @Override public final void snapshotState(StateSnapshotContext context) throws Exception { if (requiresStableInput) { @@ -761,16 +756,6 @@ public final void snapshotState(StateSnapshotContext context) throws Exception { // snapshot id and start a new buffer for elements arriving after this snapshot. bufferingDoFnRunner.checkpoint(context.getCheckpointId()); } - - // We can't output here anymore because the checkpoint barrier has already been - // sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier. - outputManager.openBuffer(); - // Ensure that no new bundle gets started as part of finishing a bundle - while (bundleStarted.get()) { - invokeFinishBundle(); - } - outputManager.closeBuffer(); - super.snapshotState(context); } @@ -821,33 +806,22 @@ private void setCurrentOutputWatermark(long currentOutputWatermark) { this.currentOutputWatermark = currentOutputWatermark; } - /** Factory for creating an {@link BufferedOutputManager} from a Flink {@link Output}. */ + /** Factory for creating an {@link FlinkOutputManager} from a Flink {@link Output}. */ interface OutputManagerFactory extends Serializable { - BufferedOutputManager create( + FlinkOutputManager create( Output>> output, - Lock bufferLock, @Nullable OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, @Nullable KeySelector keySelector) throws Exception; } - /** - * A {@link DoFnRunners.OutputManager} that can buffer its outputs. Uses {@link - * PushedBackElementsHandler} to buffer the data. Buffering data is necessary because no elements - * can be emitted during {@code snapshotState}. This can be removed once we upgrade Flink to >= - * 1.6 which allows us to finish the bundle before the checkpoint barriers have been emitted. - */ - public static class BufferedOutputManager implements DoFnRunners.OutputManager { + /** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */ + public static class FlinkOutputManager implements DoFnRunners.OutputManager { private final TupleTag mainTag; private final Map, OutputTag>> tagsToOutputTags; private final Map, Integer> tagsToIds; - /** - * A lock to be acquired before writing to the buffer. This lock will only be acquired during - * buffering. It will not be acquired during flushing the buffer. - */ - private final Lock bufferLock; private Map> idsToTags; /** Elements buffered during a snapshot, by output id. */ @@ -856,20 +830,16 @@ public static class BufferedOutputManager implements DoFnRunners.Output protected final Output>> output; - private boolean openBuffer = false; - - BufferedOutputManager( + FlinkOutputManager( Output>> output, TupleTag mainTag, Map, OutputTag>> tagsToOutputTags, Map, Integer> tagsToIds, - Lock bufferLock, PushedBackElementsHandler>> pushedBackElementsHandler) { this.output = output; this.mainTag = mainTag; this.tagsToOutputTags = tagsToOutputTags; this.tagsToIds = tagsToIds; - this.bufferLock = bufferLock; this.idsToTags = new HashMap<>(); for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { idsToTags.put(entry.getValue(), entry.getKey()); @@ -877,54 +847,8 @@ public static class BufferedOutputManager implements DoFnRunners.Output this.pushedBackElementsHandler = pushedBackElementsHandler; } - void openBuffer() { - this.openBuffer = true; - } - - void closeBuffer() { - this.openBuffer = false; - } - @Override public void output(TupleTag tag, WindowedValue value) { - if (!openBuffer) { - emit(tag, value); - } else { - buffer(KV.of(tagsToIds.get(tag), value)); - } - } - - private void buffer(KV> taggedValue) { - try { - bufferLock.lock(); - pushedBackElementsHandler.pushBack(taggedValue); - } catch (Exception e) { - throw new RuntimeException("Couldn't pushback element.", e); - } finally { - bufferLock.unlock(); - } - } - - /** - * Flush elements of bufferState to Flink Output. This method can't be invoked in {@link - * #snapshotState(StateSnapshotContext)}. The buffer should be flushed before starting a new - * bundle when the buffer cannot be concurrently accessed and thus does not need to be guarded - * by a lock. - */ - void flushBuffer() { - try { - pushedBackElementsHandler - .getElements() - .forEach( - element -> - emit(idsToTags.get(element.getKey()), (WindowedValue) element.getValue())); - pushedBackElementsHandler.clear(); - } catch (Exception e) { - throw new RuntimeException("Couldn't flush pushed back elements.", e); - } - } - - private void emit(TupleTag tag, WindowedValue value) { if (tag.equals(mainTag)) { // with tagged outputs we can't get around this because we don't // know our own output type... @@ -977,8 +901,8 @@ public void verifyDeterministic() throws NonDeterministicException { } /** - * Implementation of {@link OutputManagerFactory} that creates an {@link BufferedOutputManager} - * that can write to multiple logical outputs by Flink side output. + * Implementation of {@link OutputManagerFactory} that creates an {@link FlinkOutputManager} that + * can write to multiple logical outputs by Flink side output. */ public static class MultiOutputOutputManagerFactory implements OutputManagerFactory { @@ -1013,15 +937,13 @@ public MultiOutputOutputManagerFactory( } @Override - public BufferedOutputManager create( + public FlinkOutputManager create( Output>> output, - Lock bufferLock, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, @Nullable KeySelector keySelector) throws Exception { Preconditions.checkNotNull(output); - Preconditions.checkNotNull(bufferLock); Preconditions.checkNotNull(operatorStateBackend); Preconditions.checkState( (keyedStateBackend == null) == (keySelector == null), @@ -1046,8 +968,8 @@ public BufferedOutputManager create( pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); } - return new BufferedOutputManager<>( - output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler); + return new FlinkOutputManager<>( + output, mainTag, tagsToOutputTags, tagsToIds, pushedBackElementsHandler); } private TaggedKvCoder buildTaggedKvCoder() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 0e8977b11caa..a23ac1bad7b2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -173,11 +173,6 @@ public ExecutableStageDoFnOperator( this.stateBackendLock = new ReentrantLock(); } - @Override - protected Lock getLockToAcquireForStateAccessDuringBundles() { - return stateBackendLock; - } - @Override public void open() throws Exception { executableStage = ExecutableStage.fromPayload(payload); @@ -526,7 +521,7 @@ private static class SdkHarnessDoFnRunner private final StageBundleFactory stageBundleFactory; private final StateRequestHandler stateRequestHandler; private final BundleProgressHandler progressHandler; - private final BufferedOutputManager outputManager; + private final FlinkOutputManager outputManager; private final Map> outputMap; /** Timer Output Pcollection id => TimerSpec. */ private final Map timerOutputIdToSpecMap; @@ -543,7 +538,7 @@ public SdkHarnessDoFnRunner( StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, - BufferedOutputManager outputManager, + FlinkOutputManager outputManager, Map> outputMap, Coder windowCoder, BiConsumer, TimerInternals.TimerData> timerRegistration, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 57f7694d95ec..5477ba2780df 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; @@ -30,10 +29,8 @@ import com.fasterxml.jackson.databind.util.LRUMap; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Optional; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; @@ -1220,17 +1217,30 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("c"))); + doFnOperator.prepareSnapshotPreBarrier(0); + + // Bundle should have been finished via prepareSnapshotPreBarrier + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + // draw a snapshot OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); - // Finish bundle element will be buffered as part of finishing a bundle in snapshot() - PushedBackElementsHandler>> pushedBackElementsHandler = - doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class)); - List>> bufferedElements = - pushedBackElementsHandler.getElements().collect(Collectors.toList()); + // No further output should have been generated assertThat( - bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle")))); + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); testHarness.close(); @@ -1270,7 +1280,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1280,7 +1289,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1292,7 +1300,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1370,18 +1377,20 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); - // Take a snapshot - OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + doFnOperator.prepareSnapshotPreBarrier(0); - // Finish bundle element will be buffered as part of finishing a bundle in snapshot() - PushedBackElementsHandler>> pushedBackElementsHandler = - doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class)); - List>> bufferedElements = - pushedBackElementsHandler.getElements().collect(Collectors.toList()); + // Bundle should have been finished via prepareSnapshotPreBarrier assertThat( - bufferedElements, - contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))))); + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + + // Take a snapshot + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); @@ -1413,7 +1422,6 @@ public void finishBundle(FinishBundleContext context) { testHarness.open(); - // startBundle will output the buffered elements. testHarness.processElement( new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("key", "d")))); @@ -1423,8 +1431,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( - // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), WindowedValue.valueInGlobalWindow(KV.of("key", "d")), WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index f9718cea39e3..b5ae40bdd849 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -610,6 +610,7 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing if (withCheckpointing) { // Upon checkpointing, the bundle is finished and the watermark advances; // timers can fire. Note: The bundle is ensured to be finished. + operator.prepareSnapshotPreBarrier(0); testHarness.snapshot(0, 0); // The user timer was scheduled to fire after cleanup, but executes first