KAFKA-8602: Fix bug in stand-by task creation#7008
Conversation
|
Call for review: @mjsax @bbejeck @vvcephei @abbccdda @guozhangwang @ableegoldman |
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM. Couple of minor comments.
| streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | ||
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); |
There was a problem hiding this comment.
Nit: no need to overwrite; 1 is the default anyway
| streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); | ||
| streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); | ||
| streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
| } | ||
|
|
||
| @Test | ||
| public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws InterruptedException { |
There was a problem hiding this comment.
nit: simplify InterruptedException -> Exception
|
|
||
| @Override | ||
| public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { | ||
| state.putIfAbsent(key, value); |
There was a problem hiding this comment.
We we actually need to use the store? Seems we can remove this code?
| @BeforeClass | ||
| public static void createTopics() throws InterruptedException { | ||
| CLUSTER.createTopic(INPUT_TOPIC, 2, 1); | ||
| CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1); |
There was a problem hiding this comment.
Why do we need an output topic? We never use it to consume a result?
| public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { | ||
| state.putIfAbsent(key, value); | ||
| final KeyValue<Integer, Integer> result = new KeyValue<>(key, value); | ||
| return result; |
| final KafkaStreams client1 = new KafkaStreams(topology, streamsConfiguration()); | ||
| final KafkaStreams client2 = new KafkaStreams(topology, streamsConfiguration()); | ||
|
|
||
| final boolean[] client1IsOk = {false}; // has to be a final array, otherwise flag cannot be modified in lambda |
There was a problem hiding this comment.
nit: remove comment (we would make it final in any case anyway).
I am wondering if it should be volatile thought, as the state listener callback is used by a different thread?
There was a problem hiding this comment.
Here the main point of the comment is that we need to use an array of booleans instead of a plain boolean variable, because otherwise I cannot declare it final and modify it within the lambda.
Since I used a volatile variable now (good point, btw), the array is not needed anymore. Hence, I removed the comment.
| client1IsOk[0] = true; | ||
| } | ||
| }); | ||
| final boolean[] client2IsOk = {false}; // has to be a final array, otherwise flag cannot be modified in lambda |
| TestUtils.waitForCondition( | ||
| () -> client1IsOk[0] && client2IsOk[0], | ||
| 30 * 1000, | ||
| "At least one client is not in state RUNNING or has a stand-by task"); |
There was a problem hiding this comment.
Should we split both conditions? Maybe only wait for state RUNNING and check localThreadsMetadata after the wait condition?
There was a problem hiding this comment.
I tried it and the test became flaky. The reason is that without fix the client is in RUNNING when the IllegalStateException is thrown, then it changes to ERROR, PENDING_SHUTDOWN, and finally NOT_RUNNING. It could happen that the wait condition gets satisfied when the client is in RUNNING. Then when the tests verifies localThreadMetadata two scenario can occur:
- client is still in RUNNING:
localThreadMetadatacontains a stand-by task -> assertion not satisfied - client is not in RUNNING: exception is thrown because
localThreadMetadatacan only be accessed when the client is inRUNNING.
Both scenarios would let the test fail which would be OK, but not really clean. However, the test was sometimes green, which I currently do not understand. My guess would be race condition.
My approach checks 'localThreadMetadata' when the state changes to RUNNING, which should be safe in the error case. In the good case the test could run into the timeout if start-up is slow, though.
|
Retest this, please |
- Verifies that all stream threads of a client do not have assigned stand-by tasks. Before only the first stream thread returned by the iterator was verified. Now the test is safe to be run with multiple stream threads per client. - Renamed test class to conform with format of `StandbyTask` class
|
failures unrelated Retest this, please |
|
Retest this, please |
| client1.setStateListener((newState, oldState) -> { | ||
| if (newState == State.RUNNING && | ||
| client1.localThreadsMetadata().stream().allMatch(thread -> thread.standbyTasks().isEmpty())) { | ||
|
|
There was a problem hiding this comment.
I left this extra line on purpose so that it is immediately clear that client1IsOk = true; is not part of the if condition. Since indentation of line 113 and 115 is both four characters, both lines would appear at first sight as if they belonged to same code block even though they do not.
| TestUtils.waitForCondition( | ||
| () -> client1IsOk && client2IsOk, | ||
| 30 * 1000, | ||
| "At least one client did not reach state RUNNING without any stand-by tasks"); |
There was a problem hiding this comment.
We could actually show the client state by:
"Some clients didn't reach state RUNNING without any stand-by tasks. Eventual status: [Client1: {}, Client2: {}]", client1IsOk, client2IsOk
|
|
||
| @Test | ||
| public void shouldCreateStandbyTask() { | ||
| final MockProcessor mockProcessor = new MockProcessor(); |
There was a problem hiding this comment.
We could reuse initialization code for L1068 - L1070
| internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), "processor1"); | ||
| final StreamThread.StandbyTaskCreator standbyTaskCreator = createStandbyTaskCreator(internalTopologyBuilder); | ||
|
|
||
| final StandbyTask standbyTask = standbyTaskCreator.createTask( |
There was a problem hiding this comment.
The standby task could also be reused IIUC
|
Retest this, please |
1 similar comment
|
Retest this, please |
|
failures unrelated Retest this, please |
| final ProcessorTopology topology = builder.build(taskId.topicGroupId); | ||
|
|
||
| if (!topology.stateStores().isEmpty()) { | ||
| if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) { |
There was a problem hiding this comment.
Does this still work for optimized source tables, which read from the input topic instead?
There was a problem hiding this comment.
Good point. Thinking about this, StandBys for source-KTables might have been broker for a long time already... (maybe since 0.10.0.0???)
Maybe @cadonna can verify? If that is the case, we should split out a separate ticket and PR to fix StandBys for source-KTables independently.
There was a problem hiding this comment.
Yeah, good point! Added an integration test to verify materialized and optimized source tables.
There was a problem hiding this comment.
Cannot follow. You test seem to use the PAPI, and the PAPI does not provide the KTable optimization. You would need to use StreamBuilder#table() to test the changelog optimization.
There was a problem hiding this comment.
See StandbyTaskCreationIntegrationTest line 128.
|
Retest this, please |
|
Failures unrelated |
| final Properties streamsConfiguration1 = streamsConfiguration(); | ||
| streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); | ||
| final Properties streamsConfiguration2 = streamsConfiguration(); | ||
| streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); |
There was a problem hiding this comment.
Probably not that important, but are these two Properties objects exactly the same?
There was a problem hiding this comment.
No, they are not because of streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());. Each instance of the Properties has a different state directory, which is good because the test does not work otherwise. I tried it out.
|
Retest this, please |
|
Reopen to trigger build |
|
Retest this, please |
|
retest this please |
|
3 green builds, failing 4th build is from disabling the previous GitHub PR builder in favor of a new one. There were some errors with the new PR builder, so the previous GitHub PR builder has been re-enabled, so merging this PR. |
|
Merged #7008 into trunk |
Backports bugfix in standby task creation from PR apache#7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards.
Backports bugfix in standby task creation from PR apache#7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards.
Backports bugfix in standby task creation from PR apache#7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards.
Backports bugfix in standby task creation from PR apache#7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards.
|
cherry-picked to 2.3 via #7092 |
Backports bugfix in standby task creation from PR #7008. A separate PR is needed because some tests in the original PR use topology optimizations and mocks that were introduced afterwards. Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
…ache#7092) TICKET = KAFKA-8602 LI_DESCRIPTION = EXIT_CRITERIA = HASH [011ec51] ORIGINAL_DESCRIPTION = This PR is from the original work by @cadonna in apache#7008. Due to incompatible changes in trunk that should not get cherry-picked back, a separate PR is required for this bug fix Reviewers: Matthias J. Sax <mjsax@apache.org> (cherry picked from commit 011ec51)
Committer Checklist (excluded from commit message)