From 26b5e8bb614d50c9fcf52d5ee4a976e814084f4d Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 24 Sep 2019 15:11:53 +0200 Subject: [PATCH] [BEAM-6733] Use Flink's prepareSnapshotPreBarrier to replace BufferedOutputManager For Flink version <= 1.5 the Flink Runner had to buffer any elements which are emitted during a snapshot because the barrier has already been emitted. Flink version >= 1.6 provides a hook to execute an action before the snapshot barrier is emitted by the operator. We can remove the buffering in favor of finishing the current bundle in the DoFnOperator's prepareSnapshotPreBarrier. This had previously been deferred (#7940) until removal of Flink 1.5 (#9632). --- .../wrappers/streaming/DoFnOperator.java | 122 ++++-------------- .../ExecutableStageDoFnOperator.java | 9 +- .../wrappers/streaming/DoFnOperatorTest.java | 58 +++++---- .../ExecutableStageDoFnOperatorTest.java | 1 + 4 files changed, 57 insertions(+), 133 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 6fd5e85a4e12..baf17532c97b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,7 +33,6 @@ import java.util.Optional; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -58,7 +57,6 @@ import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.utils.FlinkClassloading; -import org.apache.beam.runners.flink.translation.utils.NoopLock; import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; @@ -146,7 +144,7 @@ public class DoFnOperator extends AbstractStreamOperator outputManager; + protected transient FlinkOutputManager outputManager; private transient DoFnInvoker doFnInvoker; @@ -385,19 +383,7 @@ public void initializeState(StateInitializationContext context) throws Exception outputManager = outputManagerFactory.create( - output, - getLockToAcquireForStateAccessDuringBundles(), - getOperatorStateBackend(), - getKeyedStateBackend(), - keySelector); - } - - /** - * Subclasses may provide a lock to ensure that the state backend is not accessed concurrently - * during bundle execution. - */ - protected Lock getLockToAcquireForStateAccessDuringBundles() { - return NoopLock.get(); + output, getOperatorStateBackend(), getKeyedStateBackend(), keySelector); } @Override @@ -719,7 +705,6 @@ private void emitAllPushedBackData() throws Exception { */ private void checkInvokeStartBundle() { if (bundleStarted.compareAndSet(false, true)) { - outputManager.flushBuffer(); pushbackDoFnRunner.startBundle(); } } @@ -754,6 +739,16 @@ protected final void invokeFinishBundle() { } } + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + // Finish the current bundle before the snapshot barrier is sent downstream. This give us a + // clean state before taking the actual snapshot. + // Ensure that no new bundle gets started as part of finishing a bundle. + while (bundleStarted.get()) { + invokeFinishBundle(); + } + } + @Override public final void snapshotState(StateSnapshotContext context) throws Exception { if (requiresStableInput) { @@ -761,16 +756,6 @@ public final void snapshotState(StateSnapshotContext context) throws Exception { // snapshot id and start a new buffer for elements arriving after this snapshot. bufferingDoFnRunner.checkpoint(context.getCheckpointId()); } - - // We can't output here anymore because the checkpoint barrier has already been - // sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier. - outputManager.openBuffer(); - // Ensure that no new bundle gets started as part of finishing a bundle - while (bundleStarted.get()) { - invokeFinishBundle(); - } - outputManager.closeBuffer(); - super.snapshotState(context); } @@ -821,33 +806,22 @@ private void setCurrentOutputWatermark(long currentOutputWatermark) { this.currentOutputWatermark = currentOutputWatermark; } - /** Factory for creating an {@link BufferedOutputManager} from a Flink {@link Output}. */ + /** Factory for creating an {@link FlinkOutputManager} from a Flink {@link Output}. */ interface OutputManagerFactory extends Serializable { - BufferedOutputManager create( + FlinkOutputManager create( Output>> output, - Lock bufferLock, @Nullable OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, @Nullable KeySelector keySelector) throws Exception; } - /** - * A {@link DoFnRunners.OutputManager} that can buffer its outputs. Uses {@link - * PushedBackElementsHandler} to buffer the data. Buffering data is necessary because no elements - * can be emitted during {@code snapshotState}. This can be removed once we upgrade Flink to >= - * 1.6 which allows us to finish the bundle before the checkpoint barriers have been emitted. - */ - public static class BufferedOutputManager implements DoFnRunners.OutputManager { + /** A {@link DoFnRunners.OutputManager} that forwards data to the Flink runtime. */ + public static class FlinkOutputManager implements DoFnRunners.OutputManager { private final TupleTag mainTag; private final Map, OutputTag>> tagsToOutputTags; private final Map, Integer> tagsToIds; - /** - * A lock to be acquired before writing to the buffer. This lock will only be acquired during - * buffering. It will not be acquired during flushing the buffer. - */ - private final Lock bufferLock; private Map> idsToTags; /** Elements buffered during a snapshot, by output id. */ @@ -856,20 +830,16 @@ public static class BufferedOutputManager implements DoFnRunners.Output protected final Output>> output; - private boolean openBuffer = false; - - BufferedOutputManager( + FlinkOutputManager( Output>> output, TupleTag mainTag, Map, OutputTag>> tagsToOutputTags, Map, Integer> tagsToIds, - Lock bufferLock, PushedBackElementsHandler>> pushedBackElementsHandler) { this.output = output; this.mainTag = mainTag; this.tagsToOutputTags = tagsToOutputTags; this.tagsToIds = tagsToIds; - this.bufferLock = bufferLock; this.idsToTags = new HashMap<>(); for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { idsToTags.put(entry.getValue(), entry.getKey()); @@ -877,54 +847,8 @@ public static class BufferedOutputManager implements DoFnRunners.Output this.pushedBackElementsHandler = pushedBackElementsHandler; } - void openBuffer() { - this.openBuffer = true; - } - - void closeBuffer() { - this.openBuffer = false; - } - @Override public void output(TupleTag tag, WindowedValue value) { - if (!openBuffer) { - emit(tag, value); - } else { - buffer(KV.of(tagsToIds.get(tag), value)); - } - } - - private void buffer(KV> taggedValue) { - try { - bufferLock.lock(); - pushedBackElementsHandler.pushBack(taggedValue); - } catch (Exception e) { - throw new RuntimeException("Couldn't pushback element.", e); - } finally { - bufferLock.unlock(); - } - } - - /** - * Flush elements of bufferState to Flink Output. This method can't be invoked in {@link - * #snapshotState(StateSnapshotContext)}. The buffer should be flushed before starting a new - * bundle when the buffer cannot be concurrently accessed and thus does not need to be guarded - * by a lock. - */ - void flushBuffer() { - try { - pushedBackElementsHandler - .getElements() - .forEach( - element -> - emit(idsToTags.get(element.getKey()), (WindowedValue) element.getValue())); - pushedBackElementsHandler.clear(); - } catch (Exception e) { - throw new RuntimeException("Couldn't flush pushed back elements.", e); - } - } - - private void emit(TupleTag tag, WindowedValue value) { if (tag.equals(mainTag)) { // with tagged outputs we can't get around this because we don't // know our own output type... @@ -977,8 +901,8 @@ public void verifyDeterministic() throws NonDeterministicException { } /** - * Implementation of {@link OutputManagerFactory} that creates an {@link BufferedOutputManager} - * that can write to multiple logical outputs by Flink side output. + * Implementation of {@link OutputManagerFactory} that creates an {@link FlinkOutputManager} that + * can write to multiple logical outputs by Flink side output. */ public static class MultiOutputOutputManagerFactory implements OutputManagerFactory { @@ -1013,15 +937,13 @@ public MultiOutputOutputManagerFactory( } @Override - public BufferedOutputManager create( + public FlinkOutputManager create( Output>> output, - Lock bufferLock, OperatorStateBackend operatorStateBackend, @Nullable KeyedStateBackend keyedStateBackend, @Nullable KeySelector keySelector) throws Exception { Preconditions.checkNotNull(output); - Preconditions.checkNotNull(bufferLock); Preconditions.checkNotNull(operatorStateBackend); Preconditions.checkState( (keyedStateBackend == null) == (keySelector == null), @@ -1046,8 +968,8 @@ public BufferedOutputManager create( pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); } - return new BufferedOutputManager<>( - output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler); + return new FlinkOutputManager<>( + output, mainTag, tagsToOutputTags, tagsToIds, pushedBackElementsHandler); } private TaggedKvCoder buildTaggedKvCoder() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 0e8977b11caa..a23ac1bad7b2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -173,11 +173,6 @@ public ExecutableStageDoFnOperator( this.stateBackendLock = new ReentrantLock(); } - @Override - protected Lock getLockToAcquireForStateAccessDuringBundles() { - return stateBackendLock; - } - @Override public void open() throws Exception { executableStage = ExecutableStage.fromPayload(payload); @@ -526,7 +521,7 @@ private static class SdkHarnessDoFnRunner private final StageBundleFactory stageBundleFactory; private final StateRequestHandler stateRequestHandler; private final BundleProgressHandler progressHandler; - private final BufferedOutputManager outputManager; + private final FlinkOutputManager outputManager; private final Map> outputMap; /** Timer Output Pcollection id => TimerSpec. */ private final Map timerOutputIdToSpecMap; @@ -543,7 +538,7 @@ public SdkHarnessDoFnRunner( StageBundleFactory stageBundleFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler, - BufferedOutputManager outputManager, + FlinkOutputManager outputManager, Map> outputMap, Coder windowCoder, BiConsumer, TimerInternals.TimerData> timerRegistration, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index 57f7694d95ec..5477ba2780df 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; @@ -30,10 +29,8 @@ import com.fasterxml.jackson.databind.util.LRUMap; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Optional; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; @@ -1220,17 +1217,30 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("c"))); + doFnOperator.prepareSnapshotPreBarrier(0); + + // Bundle should have been finished via prepareSnapshotPreBarrier + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + // draw a snapshot OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); - // Finish bundle element will be buffered as part of finishing a bundle in snapshot() - PushedBackElementsHandler>> pushedBackElementsHandler = - doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class)); - List>> bufferedElements = - pushedBackElementsHandler.getElements().collect(Collectors.toList()); + // No further output should have been generated assertThat( - bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle")))); + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"), + WindowedValue.valueInGlobalWindow("finishBundle"))); testHarness.close(); @@ -1270,7 +1280,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1280,7 +1289,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1292,7 +1300,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("finishBundle"))); @@ -1370,18 +1377,20 @@ public void finishBundle(FinishBundleContext context) { WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); - // Take a snapshot - OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + doFnOperator.prepareSnapshotPreBarrier(0); - // Finish bundle element will be buffered as part of finishing a bundle in snapshot() - PushedBackElementsHandler>> pushedBackElementsHandler = - doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class)); - List>> bufferedElements = - pushedBackElementsHandler.getElements().collect(Collectors.toList()); + // Bundle should have been finished via prepareSnapshotPreBarrier assertThat( - bufferedElements, - contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))))); + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("key", "a")), + WindowedValue.valueInGlobalWindow(KV.of("key", "b")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), + WindowedValue.valueInGlobalWindow(KV.of("key", "c")), + WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + + // Take a snapshot + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); @@ -1413,7 +1422,6 @@ public void finishBundle(FinishBundleContext context) { testHarness.open(); - // startBundle will output the buffered elements. testHarness.processElement( new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("key", "d")))); @@ -1423,8 +1431,6 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( - // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), WindowedValue.valueInGlobalWindow(KV.of("key", "d")), WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index f9718cea39e3..b5ae40bdd849 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -610,6 +610,7 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing if (withCheckpointing) { // Upon checkpointing, the bundle is finished and the watermark advances; // timers can fire. Note: The bundle is ensured to be finished. + operator.prepareSnapshotPreBarrier(0); testHarness.snapshot(0, 0); // The user timer was scheduled to fire after cleanup, but executes first