Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,7 @@ public void initializeState(StateInitializationContext context) throws Exception

outputManager =
outputManagerFactory.create(
output,
getLockToAcquireForStateAccessDuringBundles(),
getOperatorStateBackend(),
getKeyedStateBackend(),
keySelector);
output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend());
}

/**
Expand Down Expand Up @@ -828,9 +824,7 @@ interface OutputManagerFactory<OutputT> extends Serializable {
BufferedOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
OperatorStateBackend operatorStateBackend)
throws Exception;
}

Expand Down Expand Up @@ -1018,35 +1012,19 @@ public MultiOutputOutputManagerFactory(
public BufferedOutputManager<OutputT> create(
Output<StreamRecord<WindowedValue<OutputT>>> output,
Lock bufferLock,
OperatorStateBackend operatorStateBackend,
@Nullable KeyedStateBackend keyedStateBackend,
@Nullable KeySelector keySelector)
OperatorStateBackend operatorStateBackend)
throws Exception {
Preconditions.checkNotNull(output);
Preconditions.checkNotNull(bufferLock);
Preconditions.checkNotNull(operatorStateBackend);
Preconditions.checkState(
(keyedStateBackend == null) == (keySelector == null),
"Either both KeyedStatebackend and Keyselector are provided or none.");

TaggedKvCoder taggedKvCoder = buildTaggedKvCoder();
ListStateDescriptor<KV<Integer, WindowedValue<?>>> taggedOutputPushbackStateDescriptor =
new ListStateDescriptor<>("bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder));

final PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler;
if (keyedStateBackend != null) {
// build a key selector for the tagged output
KeySelector<KV<Integer, WindowedValue<?>>, ?> taggedValueKeySelector =
(KeySelector<KV<Integer, WindowedValue<?>>, Object>)
value -> keySelector.getKey(value.getValue());
pushedBackElementsHandler =
KeyedPushedBackElementsHandler.create(
taggedValueKeySelector, keyedStateBackend, taggedOutputPushbackStateDescriptor);
} else {
ListState<KV<Integer, WindowedValue<?>>> listState =
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState);
}
ListState<KV<Integer, WindowedValue<?>>> listStateBuffer =
operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor);
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler =
NonKeyedPushedBackElementsHandler.create(listStateBuffer);

return new BufferedOutputManager<>(
output, mainTag, tagsToOutputTags, tagsToIds, bufferLock, pushedBackElementsHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,20 +1313,25 @@ public void testBundleKeyed() throws Exception {
options.setMaxBundleSize(2L);
options.setMaxBundleTimeMills(10L);

IdentityDoFn<KV<String, String>> doFn =
new IdentityDoFn<KV<String, String>>() {
DoFn<KV<String, String>, String> doFn =
new DoFn<KV<String, String>, String>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
// Change output type of element to test that we do not depend on the input keying
ctx.output(ctx.element().getValue());
}

@FinishBundle
public void finishBundle(FinishBundleContext context) {
context.output(
KV.of("key2", "finishBundle"),
BoundedWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.INSTANCE);
"finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
}
};

DoFnOperator.MultiOutputOutputManagerFactory<KV<String, String>> outputManagerFactory =
DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
new DoFnOperator.MultiOutputOutputManagerFactory(
outputTag, WindowedValue.getFullCoder(kvCoder, GlobalWindow.Coder.INSTANCE));
outputTag,
WindowedValue.getFullCoder(kvCoder.getValueCoder(), GlobalWindow.Coder.INSTANCE));

DoFnOperator<KV<String, String>, KV<String, String>> doFnOperator =
new DoFnOperator(
Expand All @@ -1347,8 +1352,7 @@ public void finishBundle(FinishBundleContext context) {
DoFnSchemaInformation.create(),
Collections.emptyMap());

OneInputStreamOperatorTestHarness<
WindowedValue<KV<String, String>>, WindowedValue<KV<String, String>>>
OneInputStreamOperatorTestHarness<WindowedValue<KV<String, String>>, WindowedValue<String>>
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
doFnOperator, keySelector, keySelector.getProducedType());
Expand All @@ -1365,23 +1369,22 @@ public void finishBundle(FinishBundleContext context) {
assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
WindowedValue.valueInGlobalWindow(KV.of("key", "a")),
WindowedValue.valueInGlobalWindow(KV.of("key", "b")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "c"))));
WindowedValue.valueInGlobalWindow("a"),
WindowedValue.valueInGlobalWindow("b"),
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("c")));

// Take a snapshot
OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);

// Finish bundle element will be buffered as part of finishing a bundle in snapshot()
PushedBackElementsHandler<KV<Integer, WindowedValue<?>>> pushedBackElementsHandler =
doFnOperator.outputManager.pushedBackElementsHandler;
assertThat(pushedBackElementsHandler, instanceOf(KeyedPushedBackElementsHandler.class));
assertThat(pushedBackElementsHandler, instanceOf(NonKeyedPushedBackElementsHandler.class));
List<KV<Integer, WindowedValue<?>>> bufferedElements =
pushedBackElementsHandler.getElements().collect(Collectors.toList());
assertThat(
bufferedElements,
contains(KV.of(0, WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")))));
bufferedElements, contains(KV.of(0, WindowedValue.valueInGlobalWindow("finishBundle"))));

testHarness.close();

Expand Down Expand Up @@ -1424,9 +1427,9 @@ public void finishBundle(FinishBundleContext context) {
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(
// The first finishBundle is restored from the checkpoint
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle")),
WindowedValue.valueInGlobalWindow(KV.of("key", "d")),
WindowedValue.valueInGlobalWindow(KV.of("key2", "finishBundle"))));
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));

testHarness.close();
}
Expand Down