diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java index f68c27dc7d4f..8466c89cb054 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkStreamingPortableTranslations.java @@ -130,7 +130,7 @@ private void translateKafkaInput( FlinkKafkaConsumer011> kafkaSource = new FlinkKafkaConsumer011<>(topic, new ByteArrayWindowedValueSchema(), properties); - if (params.containsKey("start_from_timestamp_millis")) { + if (params.getOrDefault("start_from_timestamp_millis", null) != null) { kafkaSource.setStartFromTimestamp( Long.parseLong(params.get("start_from_timestamp_millis").toString())); } else { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index ea9c7c5da98d..78618a2256bb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -389,11 +389,7 @@ public void initializeState(StateInitializationContext context) throws Exception outputManager = outputManagerFactory.create( - output, - getLockToAcquireForStateAccessDuringBundles(), - getOperatorStateBackend(), - getKeyedStateBackend(), - keySelector); + output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend()); } /** @@ -769,12 +765,19 @@ public final void snapshotState(StateSnapshotContext context) throws Exception { // We can't output here anymore because the checkpoint barrier has already been // sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier. - outputManager.openBuffer(); - // Ensure that no new bundle gets started as part of finishing a bundle - while (bundleStarted.get()) { - invokeFinishBundle(); + try { + outputManager.openBuffer(); + // Ensure that no new bundle gets started as part of finishing a bundle + while (bundleStarted.get()) { + invokeFinishBundle(); + } + outputManager.closeBuffer(); + } catch (Exception e) { + // Any regular exception during checkpointing will be tolerated by Flink because those + // typically do not affect the execution flow. We need to fail hard here because errors + // in bundle execution are application errors which are not related to checkpointing. + throw new Error("Checkpointing failed because bundle failed to finalize.", e); } - outputManager.closeBuffer(); super.snapshotState(context); } @@ -831,9 +834,7 @@ interface OutputManagerFactory extends Serializable { BufferedOutputManager create( Output>> output, Lock bufferLock, - @Nullable OperatorStateBackend operatorStateBackend, - @Nullable KeyedStateBackend keyedStateBackend, - @Nullable KeySelector keySelector) + OperatorStateBackend operatorStateBackend) throws Exception; } @@ -917,6 +918,10 @@ private void buffer(KV> taggedValue) { * by a lock. */ void flushBuffer() { + if (openBuffer) { + // Buffering currently in progress, do not proceed + return; + } try { pushedBackElementsHandler .getElements() @@ -1021,35 +1026,19 @@ public MultiOutputOutputManagerFactory( public BufferedOutputManager create( Output>> output, Lock bufferLock, - OperatorStateBackend operatorStateBackend, - @Nullable KeyedStateBackend keyedStateBackend, - @Nullable KeySelector keySelector) + OperatorStateBackend operatorStateBackend) throws Exception { Preconditions.checkNotNull(output); Preconditions.checkNotNull(bufferLock); Preconditions.checkNotNull(operatorStateBackend); - Preconditions.checkState( - (keyedStateBackend == null) == (keySelector == null), - "Either both KeyedStatebackend and Keyselector are provided or none."); TaggedKvCoder taggedKvCoder = buildTaggedKvCoder(); ListStateDescriptor>> taggedOutputPushbackStateDescriptor = new ListStateDescriptor<>("bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder)); - - final PushedBackElementsHandler>> pushedBackElementsHandler; - if (keyedStateBackend != null) { - // build a key selector for the tagged output - KeySelector>, ?> taggedValueKeySelector = - (KeySelector>, Object>) - value -> keySelector.getKey(value.getValue()); - pushedBackElementsHandler = - KeyedPushedBackElementsHandler.create( - taggedValueKeySelector, keyedStateBackend, taggedOutputPushbackStateDescriptor); - } else { - ListState>> listState = - operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor); - pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); - } + ListState>> listStateBuffer = + operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor); + PushedBackElementsHandler>> pushedBackElementsHandler = + NonKeyedPushedBackElementsHandler.create(listStateBuffer); return new BufferedOutputManager<>( output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index ec4a2b90d34d..43578693d622 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import com.fasterxml.jackson.databind.type.TypeFactory; import com.fasterxml.jackson.databind.util.LRUMap; @@ -1313,20 +1314,25 @@ public void testBundleKeyed() throws Exception { options.setMaxBundleSize(2L); options.setMaxBundleTimeMills(10L); - IdentityDoFn> doFn = - new IdentityDoFn>() { + DoFn, String> doFn = + new DoFn, String>() { + @ProcessElement + public void processElement(ProcessContext ctx) { + // Change output type of element to test that we do not depend on the input keying + ctx.output(ctx.element().getValue()); + } + @FinishBundle public void finishBundle(FinishBundleContext context) { context.output( - KV.of("key2", "finishBundle"), - BoundedWindow.TIMESTAMP_MIN_VALUE, - GlobalWindow.INSTANCE); + "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); } }; - DoFnOperator.MultiOutputOutputManagerFactory> outputManagerFactory = + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = new DoFnOperator.MultiOutputOutputManagerFactory( - outputTag, WindowedValue.getFullCoder(kvCoder, GlobalWindow.Coder.INSTANCE)); + outputTag, + WindowedValue.getFullCoder(kvCoder.getValueCoder(), GlobalWindow.Coder.INSTANCE)); DoFnOperator, KV> doFnOperator = new DoFnOperator( @@ -1347,8 +1353,7 @@ public void finishBundle(FinishBundleContext context) { DoFnSchemaInformation.create(), Collections.emptyMap()); - OneInputStreamOperatorTestHarness< - WindowedValue>, WindowedValue>> + OneInputStreamOperatorTestHarness>, WindowedValue> testHarness = new KeyedOneInputStreamOperatorTestHarness( doFnOperator, keySelector, keySelector.getProducedType()); @@ -1365,10 +1370,10 @@ public void finishBundle(FinishBundleContext context) { assertThat( stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( - WindowedValue.valueInGlobalWindow(KV.of("key", "a")), - WindowedValue.valueInGlobalWindow(KV.of("key", "b")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "c")))); + WindowedValue.valueInGlobalWindow("a"), + WindowedValue.valueInGlobalWindow("b"), + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("c"))); // Take a snapshot OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); @@ -1376,12 +1381,11 @@ public void finishBundle(FinishBundleContext context) { // Finish bundle element will be buffered as part of finishing a bundle in snapshot() PushedBackElementsHandler>> pushedBackElementsHandler = doFnOperator.outputManager.pushedBackElementsHandler; - assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class)); + assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class)); List>> bufferedElements = pushedBackElementsHandler.getElements().collect(Collectors.toList()); assertThat( - bufferedElements, - contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))))); + bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle")))); testHarness.close(); @@ -1424,11 +1428,103 @@ public void finishBundle(FinishBundleContext context) { stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( // The first finishBundle is restored from the checkpoint - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")), - WindowedValue.valueInGlobalWindow(KV.of("key", "d")), - WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))); + WindowedValue.valueInGlobalWindow("finishBundle"), + WindowedValue.valueInGlobalWindow("d"), + WindowedValue.valueInGlobalWindow("finishBundle"))); + + testHarness.close(); + } + + @Test + public void testCheckpointBufferingWithMultipleBundles() throws Exception { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(10L); + options.setCheckpointingInterval(1L); + + TupleTag outputTag = new TupleTag<>("main-output"); + + StringUtf8Coder coder = StringUtf8Coder.of(); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(coder); + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + Supplier> doFnOperatorSupplier = + () -> + new DoFnOperator<>( + new IdentityDoFn(), + "stepName", + windowedValueCoder, + null, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + DoFnOperator doFnOperator = doFnOperatorSupplier.get(); + @SuppressWarnings("unchecked") + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + // start a bundle + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element"))); + // Set the callback which will trigger two bundles upon checkpointing + doFnOperator.setBundleFinishedCallback( + () -> { + try { + // Clear this early for the test here because we want to finish the bundle from within + // the callback which would otherwise cause an infinitive recursion + doFnOperator.setBundleFinishedCallback(null); + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("trigger another bundle"))); + doFnOperator.invokeFinishBundle(); + testHarness.processElement( + new StreamRecord<>( + WindowedValue.valueInGlobalWindow( + "check that the previous element is not flushed"))); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); + + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains(WindowedValue.valueInGlobalWindow("regular element"))); testHarness.close(); + + // Restore + OneInputStreamOperatorTestHarness, WindowedValue> testHarness2 = + new OneInputStreamOperatorTestHarness<>(doFnOperatorSupplier.get()); + + testHarness2.initializeState(snapshot); + testHarness2.open(); + + testHarness2.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("after restore"))); + + assertThat( + stripStreamRecordFromWindowedValue(testHarness2.getOutput()), + contains( + WindowedValue.valueInGlobalWindow("trigger another bundle"), + WindowedValue.valueInGlobalWindow("check that the previous element is not flushed"), + WindowedValue.valueInGlobalWindow("after restore"))); } @Test @@ -1719,6 +1815,63 @@ public void processElement(ProcessContext context) { Collections.emptyMap()); } + @Test + public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws Exception { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setMaxBundleSize(10L); + options.setCheckpointingInterval(1L); + + TupleTag outputTag = new TupleTag<>("main-output"); + + StringUtf8Coder coder = StringUtf8Coder.of(); + WindowedValue.ValueOnlyWindowedValueCoder windowedValueCoder = + WindowedValue.getValueOnlyCoder(coder); + + DoFnOperator.MultiOutputOutputManagerFactory outputManagerFactory = + new DoFnOperator.MultiOutputOutputManagerFactory( + outputTag, + WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)); + + @SuppressWarnings("unchecked") + DoFnOperator doFnOperator = + new DoFnOperator<>( + new IdentityDoFn() { + @FinishBundle + public void finishBundle() { + throw new RuntimeException("something went wrong here"); + } + }, + "stepName", + windowedValueCoder, + null, + Collections.emptyMap(), + outputTag, + Collections.emptyList(), + outputManagerFactory, + WindowingStrategy.globalDefault(), + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + options, + null, + null, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + @SuppressWarnings("unchecked") + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>(doFnOperator); + + testHarness.open(); + + // start a bundle + testHarness.processElement( + new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element"))); + + // Make sure we throw Error, not a regular Exception. + // A regular exception would just cause the checkpoint to fail. + assertThrows(Error.class, () -> testHarness.snapshot(0, 0)); + } + /** * Ensures Jackson cache is cleaned to get rid of any references to the Flink Classloader. See * https://jira.apache.org/jira/browse/BEAM-6460