From 90f91603731afe4d66aca4b3b7da6863f1446f7e Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Sun, 3 Nov 2019 21:09:20 +0100 Subject: [PATCH 1/4] [BEAM-8549] Do not use keyed operator state for checkpoint buffering The current buffer logic for items emitted during checkpointing is faulty in the sense that the buffer is partitioned on the output keys of the operator. The key may be changed or even be dropped. Thus, the original key partitioning will not be maintained which will cause checkpointing to fail. An alternative solution would be BEAM-6733 / #9652, but this change keeps the current buffering logic in place. The output buffer may now always be redistributed round-robin upon restoring from a checkpoint. Note that this is fine because no assumption can be made about the distribution of output elements of a DoFn operation. --- .../wrappers/streaming/DoFnOperator.java | 36 ++++------------ .../wrappers/streaming/DoFnOperatorTest.java | 41 ++++++++++--------- 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index ea9c7c5da98d..c60d6cc7b456 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -389,11 +389,7 @@ public void initializeState(StateInitializationContext context) throws Exception outputManager = outputManagerFactory.create( - output, - getLockToAcquireForStateAccessDuringBundles(), - getOperatorStateBackend(), - getKeyedStateBackend(), - keySelector); + output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend()); } /** @@ -831,9 +827,7 @@ interface OutputManagerFactory extends Serializable { BufferedOutputManager create( Output>> output, Lock bufferLock, - @Nullable OperatorStateBackend operatorStateBackend, - @Nullable KeyedStateBackend keyedStateBackend, - @Nullable KeySelector keySelector) + OperatorStateBackend operatorStateBackend) throws Exception; } @@ -1021,35 +1015,19 @@ public MultiOutputOutputManagerFactory( public BufferedOutputManager create( Output>> output, Lock bufferLock, - OperatorStateBackend operatorStateBackend, - @Nullable KeyedStateBackend keyedStateBackend, - @Nullable KeySelector keySelector) + OperatorStateBackend operatorStateBackend) throws Exception { Preconditions.checkNotNull(output); Preconditions.checkNotNull(bufferLock); Preconditions.checkNotNull(operatorStateBackend); - Preconditions.checkState( - (keyedStateBackend == null) == (keySelector == null), - "Either both KeyedStatebackend and Keyselector are provided or none."); TaggedKvCoder taggedKvCoder = buildTaggedKvCoder(); ListStateDescriptor>> taggedOutputPushbackStateDescriptor = new ListStateDescriptor<>("bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder)); - - final PushedBackElementsHandler>> pushedBackElementsHandler; - if (keyedStateBackend != null) { - // build a key selector for the tagged output - KeySelector>, ?> taggedValueKeySelector = - (KeySelector>, Object>) - value -> keySelector.getKey(value.getValue()); - pushedBackElementsHandler = - KeyedPushedBackElementsHandler.create( - taggedValueKeySelector, keyedStateBackend, taggedOutputPushbackStateDescriptor); - } else { - ListState>> listState = - operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor); - pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); - } + ListState>> listStateBuffer = + operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor); + PushedBackElementsHandler>> pushedBackElementsHandler = + NonKeyedPushedBackElementsHandler.create(listStateBuffer); return new BufferedOutputManager<>( output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index ec4a2b90d34d..ff0eee46ecc9 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -1313,20 +1313,25 @@ public void testBundleKeyed() throws Exception { options.setMaxBundleSize(2L); options.setMaxBundleTimeMills(10L); - IdentityDoFn> doFn = - new IdentityDoFn>() { + DoFn, String> doFn = + new DoFn, String>() { + @ProcessElement + public void processElement(ProcessContext ctx) { + // Change output type of element to test that we do not depend on the input keying + ctx.output(ctx.element().getValue()); + } + @FinishBundle public void finishBundle(FinishBundleContext context) { context.output( - KV.of("key2", "finishBundle"), - BoundedWindow.TIMESTAMP_MIN_VALUE, - GlobalWindow.INSTANCE); + "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); } }; - DoFnOperator.MultiOutputOutputManagerFactory> outputManagerFactory = + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory( - outputTag, WindowedValue.getFullCoder(kvCoder, GlobalWindow.Coder.INSTANCE)); + outputTag, + WindowedValue.getFullCoder(kvCoder.getValueCoder(), GlobalWindow.Coder.INSTANCE)); DoFnOperator, KV> doFnOperator = new DoFnOperator( @@ -1347,8 +1352,7 @@ public void finishBundle(FinishBundleContext context) { DoFnSchemaInformation.create(), Collections.emptyMap()); - OneInputStreamOperatorTestHarness< - WindowedValue>, WindowedValue>> + OneInputStreamOperatorTestHarness>, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness( doFnOperator, keySelector, keySelector.getProducedType()); @@ -1365,10 +1369,10 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow(KV.of("key", "a")), - WindowedValue.valueInGlobalWindow(KV.of("key", "b")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"))); // Take a snapshot OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); @@ -1376,12 +1380,11 @@ public void finishBundle(FinishBundleContext context) { // Finish bundle element will be buffered as part of finishing a bundle in snapshot() PushedBackElementsHandler>> pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class)); + assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class)); List>> bufferedElements = pushedBackElementsHandler.getElements().collect(Collectors.toList()); assertThat( - bufferedElements, - contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))))); + bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle")))); testHarness.close(); @@ -1424,9 +1427,9 @@ public void finishBundle(FinishBundleContext context) { stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "d")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); testHarness.close(); } From 4426f592e3686436b8c1995274a293fb4246ec75 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 6 Nov 2019 13:19:02 +0100 Subject: [PATCH 2/4] [BEAM-8566] Fix checkpoint buffering when when another bundle is started during checkpointing As part of a checkpoint, the current bundle is finalized. When the bundle is finalized, the watermark, currently held back, may also be progressed which can cause the start of another bundle. When a new bundle is started, any to-be-buffered items from the previous bundle for the pending checkpoint may be emitted. This should not happen. This only effects portable pipelines where we have to hold back the watermark due to the asynchronous processing of elements. --- .../wrappers/streaming/DoFnOperator.java | 4 + .../wrappers/streaming/DoFnOperatorTest.java | 92 +++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index c60d6cc7b456..4a6d855461d9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -911,6 +911,10 @@ private void buffer(KV> taggedValue) { * by a lock. */ void flushBuffer() { + if (openBuffer) { + // Buffering currently in progress, do not proceed + return; + } try { pushedBackElementsHandler .getElements() diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index ff0eee46ecc9..d27eb6e34cd0 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -1434,6 +1434,98 @@ public void finishBundle(FinishBundleContext context) { testHarness.close(); } + @Test + public void testCheckpointBufferingWithMultipleBundles() throws Exception { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(10L); + options.setCheckpointingInterval(1L); + + TupleTag outputTag = new TupleTag<>("main-output"); + + StringUtf8Coder coder = StringUtf8Coder.of(); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(coder); + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + Supplier> doFnOperatorSupplier = + () -> + new DoFnOperator<>( + new IdentityDoFn(), + "stepName", + windowedValueCoder, + null, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + DoFnOperator doFnOperator = doFnOperatorSupplier.get(); + @SuppressWarnings("unchecked") + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + // start a bundle + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element"))); + + // Set the callback which will trigger two bundles upon checkpointing + doFnOperator.setBundleFinishedCallback( + () -> { + try { + // Clear this early for the test here because we want to finish the bundle from within + // the callback which would otherwise cause an infinitive recursion + doFnOperator.setBundleFinishedCallback(null); + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("trigger another bundle"))); + doFnOperator.invokeFinishBundle(); + testHarness.processElement( + new StreamRecord<>( + WindowedValue.valueInGlobalWindow( + "check that the previous element is not flushed"))); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains(WindowedValue.valueInGlobalWindow("regular element"))); + testHarness.close(); + + // Restore + OneInputStreamOperatorTestHarness, WindowedValue> testHarness2 = + new OneInputStreamOperatorTestHarness<>(doFnOperatorSupplier.get()); + + testHarness2.initializeState(snapshot); + testHarness2.open(); + + testHarness2.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("after restore"))); + + assertThat( + stripStreamRecordFromWindowedValue(testHarness2.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("trigger another bundle"), + WindowedValue.valueInGlobalWindow("check that the previous element is not flushed"), + WindowedValue.valueInGlobalWindow("after restore"))); + } + @Test public void testExactlyOnceBuffering() throws Exception { FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); From 9ba8b297d0f5a0724fcb7bb1cac4ceaea0186b8e Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 6 Nov 2019 13:19:28 +0100 Subject: [PATCH 3/4] [BEAM-8566] Do not swallow execution errors during checkpointing If a bundle fails to finalize before creating a checkpoint, it may be swallowed and just considered a checkpointing error. This breaks the execution flow and exactly-once guarantees. --- .../wrappers/streaming/DoFnOperator.java | 17 ++++-- .../wrappers/streaming/DoFnOperatorTest.java | 58 +++++++++++++++++++ 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 4a6d855461d9..78618a2256bb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -765,12 +765,19 @@ public final void snapshotState(StateSnapshotContext context) throws Exception { // We can't output here anymore because the checkpoint barrier has already been // sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier. - outputManager.openBuffer(); - // Ensure that no new bundle gets started as part of finishing a bundle - while (bundleStarted.get()) { - invokeFinishBundle(); + try { + outputManager.openBuffer(); + // Ensure that no new bundle gets started as part of finishing a bundle + while (bundleStarted.get()) { + invokeFinishBundle(); + } + outputManager.closeBuffer(); + } catch (Exception e) { + // Any regular exception during checkpointing will be tolerated by Flink because those + // typically do not affect the execution flow. We need to fail hard here because errors + // in bundle execution are application errors which are not related to checkpointing. + throw new Error("Checkpointing failed because bundle failed to finalize.", e); } - outputManager.closeBuffer(); super.snapshotState(context); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index d27eb6e34cd0..43578693d622 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.databind.util.LRUMap; @@ -1814,6 +1815,63 @@ public void processElement(ProcessContext context) { Collections.emptyMap()); } + @Test + public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws Exception { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(10L); + options.setCheckpointingInterval(1L); + + TupleTag outputTag = new TupleTag<>("main-output"); + + StringUtf8Coder coder = StringUtf8Coder.of(); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(coder); + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + @SuppressWarnings("unchecked") + DoFnOperator doFnOperator = + new DoFnOperator<>( + new IdentityDoFn() { + @FinishBundle + public void finishBundle() { + throw new RuntimeException("something went wrong here"); + } + }, + "stepName", + windowedValueCoder, + null, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + @SuppressWarnings("unchecked") + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + // start a bundle + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element"))); + + // Make sure we throw Error, not a regular Exception. + // A regular exception would just cause the checkpoint to fail. + assertThrows(Error.class, () -> testHarness.snapshot(0, 0)); + } + /** * Ensures Jackson cache is cleaned to get rid of any references to the Flink Classloader. See * https://jira.apache.org/jira/browse/BEAM-6460 From 96ef54ec347beede679a407c135cd6fbe1c61362 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 7 Nov 2019 19:07:51 +0100 Subject: [PATCH 4/4] Fix NullPointerException for Kafka parameter start_from_timestamp_millis The default passed from the Kafka consumer wrapper for start_from_timestamp_millis is "None". --- .../runners/flink/LyftFlinkStreamingPortableTranslations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index f68c27dc7d4f..8466c89cb054 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -130,7 +130,7 @@ private void translateKafkaInput( FlinkKafkaConsumer011> kafkaSource = new FlinkKafkaConsumer011<>(topic, new ByteArrayWindowedValueSchema(), properties); - if (params.containsKey("start_from_timestamp_millis")) { + if (params.getOrDefault("start_from_timestamp_millis", null) != null) { kafkaSource.setStartFromTimestamp( Long.parseLong(params.get("start_from_timestamp_millis").toString())); } else {