Skip to content
This repository was archived by the owner on Apr 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private void translateKafkaInput(
FlinkKafkaConsumer011<WindowedValue<byte[]>> kafkaSource =
new FlinkKafkaConsumer011<>(topic, new ByteArrayWindowedValueSchema(), properties);

if (params.containsKey("start_from_timestamp_millis")) {
if (params.getOrDefault("start_from_timestamp_millis", null) != null) {
kafkaSource.setStartFromTimestamp(
Long.parseLong(params.get("start_from_timestamp_millis").toString()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,7 @@ public void initializeState(StateInitializationContext context) throws Exception

outputManager =
outputManagerFactory.create(
output,
getLockToAcquireForStateAccessDuringBundles(),
getOperatorStateBackend(),
getKeyedStateBackend(),
keySelector);
output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend());
}

/**
Expand Down Expand Up @@ -769,12 +765,19 @@ public final void snapshotState(StateSnapshotContext context) throws Exception {

// We can't output here anymore because the checkpoint barrier has already been
// sent downstream. This is going to change with 1.6/1.7's prepareSnapshotBarrier.
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted.get()) {
invokeFinishBundle();
try {
outputManager.openBuffer();
// Ensure that no new bundle gets started as part of finishing a bundle
while (bundleStarted.get()) {
invokeFinishBundle();
}
outputManager.closeBuffer();
} catch (Exception e) {
// Any regular exception during checkpointing will be tolerated by Flink because those
// typically do not affect the execution flow. We need to fail hard here because errors
// in bundle execution are application errors which are not related to checkpointing.
throw new Error("Checkpointing failed because bundle failed to finalize.", e);
}
outputManager.closeBuffer();

super.snapshotState(context);
}
Expand Down Expand Up @@ -831,9 +834,7 @@ interface OutputManagerFactory<OutputT> extends Serializable {
BufferedOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
OperatorStateBackend operatorStateBackend)
throws Exception;
}

Expand Down Expand Up @@ -917,6 +918,10 @@ private void buffer(KV<Integer, WindowedValue<?>> taggedValue) {
* by a lock.
*/
void flushBuffer() {
if (openBuffer) {
// Buffering currently in progress, do not proceed
return;
}
try {
pushedBackElementsHandler
.getElements()
Expand Down Expand Up @@ -1021,35 +1026,19 @@ public MultiOutputOutputManagerFactory(
public BufferedOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
OperatorStateBackend operatorStateBackend)
throws Exception {
Preconditions.checkNotNull(output);
Preconditions.checkNotNull(bufferLock);
Preconditions.checkNotNull(operatorStateBackend);
Preconditions.checkState(
(keyedStateBackend == null) == (keySelector == null),
"Either both KeyedStatebackend and Keyselector are provided or none.");

TaggedKvCoder taggedKvCoder = buildTaggedKvCoder();
ListStateDescriptor<KV<Integer, WindowedValue<?>>> taggedOutputPushbackStateDescriptor =
new ListStateDescriptor<>("bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder));

final PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler;
if (keyedStateBackend != null) {
// build a key selector for the tagged output
KeySelector<KV<Integer, WindowedValue<?>>, ?> taggedValueKeySelector =
(KeySelector<KV<Integer, WindowedValue<?>>, Object>)
value -> keySelector.getKey(value.getValue());
pushedBackElementsHandler =
KeyedPushedBackElementsHandler.create(
taggedValueKeySelector, keyedStateBackend, taggedOutputPushbackStateDescriptor);
} else {
ListState<KV<Integer, WindowedValue<?>>> listState =
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState);
}
ListState<KV<Integer, WindowedValue<?>>> listStateBuffer =
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler =
NonKeyedPushedBackElementsHandler.create(listStateBuffer);

return new BufferedOutputManager<>(
output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.LRUMap;
Expand Down Expand Up @@ -1313,20 +1314,25 @@ public void testBundleKeyed() throws Exception {
options.setMaxBundleSize(2L);
options.setMaxBundleTimeMills(10L);

IdentityDoFn<KV<String, String>> doFn =
new IdentityDoFn<KV<String, String>>() {
DoFn<KV<String, String>, String> doFn =
new DoFn<KV<String, String>, String>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
// Change output type of element to test that we do not depend on the input keying
ctx.output(ctx.element().getValue());
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
context.output(
KV.of("key2", "finishBundle"),
BoundedWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.INSTANCE);
"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
}
};

DoFnOperator.MultiOutputOutputManagerFactory<KV<String, String>> outputManagerFactory =
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag, WindowedValue.getFullCoder(kvCoder, GlobalWindow.Coder.INSTANCE));
outputTag,
WindowedValue.getFullCoder(kvCoder.getValueCoder(), GlobalWindow.Coder.INSTANCE));

DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
new DoFnOperator(
Expand All @@ -1347,8 +1353,7 @@ public void finishBundle(FinishBundleContext context) {
DoFnSchemaInformation.create(),
Collections.emptyMap());

OneInputStreamOperatorTestHarness<
WindowedValue<KV<String, String>>, WindowedValue<KV<String, String>>>
OneInputStreamOperatorTestHarness<WindowedValue<KV<String, String>>, WindowedValue<String>>
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
doFnOperator, keySelector, keySelector.getProducedType());
Expand All @@ -1365,23 +1370,22 @@ public void finishBundle(FinishBundleContext context) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("c")));

// Take a snapshot
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

// Finish bundle element will be buffered as part of finishing a bundle in snapshot()
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler =
doFnOperator.outputManager.pushedBackElementsHandler;
assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class));
assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class));
List<KV<Integer, WindowedValue<?>>> bufferedElements =
pushedBackElementsHandler.getElements().collect(Collectors.toList());
assertThat(
bufferedElements,
contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))));
bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle"))));

testHarness.close();

Expand Down Expand Up @@ -1424,11 +1428,103 @@ public void finishBundle(FinishBundleContext context) {
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
// The first finishBundle is restored from the checkpoint
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));

testHarness.close();
}

@Test
public void testCheckpointBufferingWithMultipleBundles() throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setMaxBundleSize(10L);
options.setCheckpointingInterval(1L);

TupleTag<String> outputTag = new TupleTag<>("main-output");

StringUtf8Coder coder = StringUtf8Coder.of();
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
WindowedValue.getValueOnlyCoder(coder);

DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));

Supplier<DoFnOperator<String, String>> doFnOperatorSupplier =
() ->
new DoFnOperator<>(
new IdentityDoFn(),
"stepName",
windowedValueCoder,
null,
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
options,
null,
null,
DoFnSchemaInformation.create(),
Collections.emptyMap());

DoFnOperator<String, String> doFnOperator = doFnOperatorSupplier.get();
@SuppressWarnings("unchecked")
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);

testHarness.open();

// start a bundle
testHarness.processElement(
new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element")));

// Set the callback which will trigger two bundles upon checkpointing
doFnOperator.setBundleFinishedCallback(
() -> {
try {
// Clear this early for the test here because we want to finish the bundle from within
// the callback which would otherwise cause an infinitive recursion
doFnOperator.setBundleFinishedCallback(null);
testHarness.processElement(
new StreamRecord<>(WindowedValue.valueInGlobalWindow("trigger another bundle")));
doFnOperator.invokeFinishBundle();
testHarness.processElement(
new StreamRecord<>(
WindowedValue.valueInGlobalWindow(
"check that the previous element is not flushed")));
} catch (Exception e) {
throw new RuntimeException(e);
}
});

OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(WindowedValue.valueInGlobalWindow("regular element")));
testHarness.close();

// Restore
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness2 =
new OneInputStreamOperatorTestHarness<>(doFnOperatorSupplier.get());

testHarness2.initializeState(snapshot);
testHarness2.open();

testHarness2.processElement(
new StreamRecord<>(WindowedValue.valueInGlobalWindow("after restore")));

assertThat(
stripStreamRecordFromWindowedValue(testHarness2.getOutput()),
contains(
WindowedValue.valueInGlobalWindow("trigger another bundle"),
WindowedValue.valueInGlobalWindow("check that the previous element is not flushed"),
WindowedValue.valueInGlobalWindow("after restore")));
}

@Test
Expand Down Expand Up @@ -1719,6 +1815,63 @@ public void processElement(ProcessContext context) {
Collections.emptyMap());
}

@Test
public void testBundleProcessingExceptionIsFatalDuringCheckpointing() throws Exception {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setMaxBundleSize(10L);
options.setCheckpointingInterval(1L);

TupleTag<String> outputTag = new TupleTag<>("main-output");

StringUtf8Coder coder = StringUtf8Coder.of();
WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
WindowedValue.getValueOnlyCoder(coder);

DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag,
WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));

@SuppressWarnings("unchecked")
DoFnOperator doFnOperator =
new DoFnOperator<>(
new IdentityDoFn() {
@FinishBundle
public void finishBundle() {
throw new RuntimeException("something went wrong here");
}
},
"stepName",
windowedValueCoder,
null,
Collections.emptyMap(),
outputTag,
Collections.emptyList(),
outputManagerFactory,
WindowingStrategy.globalDefault(),
new HashMap<>(), /* side-input mapping */
Collections.emptyList(), /* side inputs */
options,
null,
null,
DoFnSchemaInformation.create(),
Collections.emptyMap());

@SuppressWarnings("unchecked")
OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);

testHarness.open();

// start a bundle
testHarness.processElement(
new StreamRecord<>(WindowedValue.valueInGlobalWindow("regular element")));

// Make sure we throw Error, not a regular Exception.
// A regular exception would just cause the checkpoint to fail.
assertThrows(Error.class, () -> testHarness.snapshot(0, 0));
}

/**
* Ensures Jackson cache is cleaned to get rid of any references to the Flink Classloader. See
* https://jira.apache.org/jira/browse/BEAM-6460
Expand Down