Skip to content

KAFKA-10017: fix flaky EOS-beta upgrade test#9688

Merged
mjsax merged 3 commits intoapache:trunkfrom
mjsax:kafka-10017-eos-upgrade-test
Dec 11, 2020
Merged

KAFKA-10017: fix flaky EOS-beta upgrade test#9688
mjsax merged 3 commits intoapache:trunkfrom
mjsax:kafka-10017-eos-upgrade-test

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Dec 4, 2020

Call for review @abbccdda @ableegoldman @guozhangwang

This PR is for trunk and 2.7. PR for 2.6 is slightly different: #9690

@mjsax mjsax added streams tests Test fixes (including flaky tests) labels Dec 4, 2020
private static final int MAX_POLL_INTERVAL_MS = 100 * 1000;
private static final int MAX_WAIT_TIME_MS = 60 * 1000;
private static final int MAX_POLL_INTERVAL_MS = (int) Duration.ofSeconds(100L).toMillis();
private static final long MAX_WAIT_TIME_MS = Duration.ofMinutes(1L).toMillis();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup

// Note: this pattern only works when we just have a single instance running with a single thread
// If we want to extend the test or reuse this CommitPunctuator we should tighten it up
private final AtomicBoolean requestCommit = new AtomicBoolean(false);
private static class CommitPunctuator implements Punctuator {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This punctuator was an attempt to stabilize the test, but without success. Removing it as this should be a proper fix now.

// p-2: 10 rec + C ---> 5 rec (pending)
// p-3: 10 rec + C ---> 5 rec (pending)
// crash case: (we just assumes that we inject the error for p-0; in reality it might be a different partition)
// (we don't crash right away and write one record less)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more details/explanations and also renames a few variables below

waitForRunning(stateTransitions2);

final Set<Long> committedKeys = mkSet(0L, 1L, 2L, 3L);
final Set<Long> newlyCommittedKeys;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first fix: ie how we compute those keys.

final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
writeInputData(finishSecondBatch);

final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the second fix: depending on task movement, we have different set of committed records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

Reminds me though, why the second rebalance may not be deterministic in migrating tasks back? I thought our algorithm should produce deterministic results? cc @ableegoldman

);

expectedUncommittedResult.addAll(
computeExpectedResult(finishSecondBatch, uncommittedState)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this, we needed to preserve old uncommittedState further above.

waitForRunning(stateTransitions2);

committedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
newlyCommittedKeys.clear();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar fix as above: we compute those keys differently now.

final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
uncommittedKeys.removeAll(newlyCommittedKeys);
final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above: we need to be more flexible (ie, depend on actual task movement)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the root source of this all is a bad assumption that the assignment would be stable if a stable CLIENT_ID was used? I remember we discussed that back when you first wrote this test, I'm sorry for any misinformation I supplied based on my own assumption about how the CLIENT_ID would be used :/

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the test assumed a more stable task->thread mapping during the assignment. But it turns out, that task assignment may "flip" (not sure about details)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman is it related to the UUID randomness? If yes please ignore my other question above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also increase the TX timeout from the to low default of 10 seconds, to avoid broker side TX-abort during the test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 minutes seems kind of long, the whole test should take only a few minutes and it has 11 phases. Would 1 minute be more reasonable? Or do we actually need this timeout to cover more than one or two phases?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- I set to to 5 minutes during debugging (ie, setting breakpoints). 1 minutes should be enough.

Or do we actually need this timeout to cover more than one or two phases?

Not sure what you mean by this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just thinking about how long a. transaction might possibly be open. 1 minute SGTM

MULTI_PARTITION_OUTPUT_TOPIC,
numberOfRecords
numberOfRecords,
MAX_WAIT_TIME_MS
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase wait time here, too.

}

private Set<Long> keysFromInstance(final KafkaStreams streams) throws Exception {
final ReadOnlyKeyValueStore<Long, Long> store = getStore(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another fix (we did see some error for getting the state stores, too).


@Override
public void commitTransaction() throws ProducerFencedException {
public void commitTransaction() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup


private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) {
return maxMessages <= 0 || messagesConsumed < maxMessages;
return maxMessages > 0 && messagesConsumed < maxMessages;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have cases when we pass in 0 and for this case, the old code did loop forever until the timeout hits and the test fails. Seems this logic was wrong from the beginning on an we should stop fetching if maxMessages <= 0 instead of looping forever.

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Seems like the underlying problem here was just that the task assignments weren't as predictable as we thought?

Had a few minor questions but overall makes sense, if I remember how this test works

final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
uncommittedKeys.removeAll(newlyCommittedKeys);
final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the root source of this all is a bad assumption that the assignment would be stable if a stable CLIENT_ID was used? I remember we discussed that back when you first wrote this test, I'm sorry for any misinformation I supplied based on my own assumption about how the CLIENT_ID would be used :/

properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) Duration.ofMinutes(5L).toMillis());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 minutes seems kind of long, the whole test should take only a few minutes and it has 11 phases. Would 1 minute be more reasonable? Or do we actually need this timeout to cover more than one or two phases?

final long potentiallyFirstFailingKey = keyFilterFirstClient.iterator().next();
cleanKeys.remove(potentiallyFirstFailingKey);
final Set<Long> keysFirstClientAlpha = keysFromInstance(streams1Alpha);
final long firstFailingKeyForCrashCase = keysFirstClientAlpha.iterator().next();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning up the variable names 🙂

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, glad we finally have this sorted out (and that it wasn't a real bug)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

BTW should we re-enable this test in the same PR?

// p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
// p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
// p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
// p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes intentional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I wanted to improve the readability of the comment -- the additional blanks separate the the main phases of the test (each main phase write 10 records per partition that should eventually be committed).


// 7. only for crash case:
// 7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
// 7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: second failed client?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "failed second client" is correct. It's the 2nd client, which has failed, not the 2nd client to have failed (English is confusing 😣 )

final List<KeyValue<Long, Long>> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
writeInputData(finishSecondBatch);

final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

Reminds me though, why the second rebalance may not be deterministic in migrating tasks back? I thought our algorithm should produce deterministic results? cc @ableegoldman

final Set<Long> uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
uncommittedKeys.removeAll(newlyCommittedKeys);
final List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman is it related to the UUID randomness? If yes please ignore my other question above.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Dec 9, 2020

BTW should we re-enable this test in the same PR?

The test is enabled... But the test failed on the 2.6 branch PR -- Seems there is still something going on.

@mjsax mjsax merged commit 567a2ec into apache:trunk Dec 11, 2020
@mjsax mjsax deleted the kafka-10017-eos-upgrade-test branch December 11, 2020 01:34
mjsax added a commit that referenced this pull request Dec 11, 2020
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Dec 11, 2020

Merged to trunk and cherry-picked to 2.7.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants