KAFKA-9832: extend Kafka Streams EOS system test#8440
Conversation
There was a problem hiding this comment.
This is an actually bug-fix. StandbyTasks did not set the eos flag to true for eos-beta and thus did not wipe out their stores in case of failure.
There was a problem hiding this comment.
This is an actually bug fix: consumedOffsetsAndMetadataPerTask could be empty, if only standby tasks (but no active tasks) are assigned to a thread.
There was a problem hiding this comment.
Could you elaborate more on why committing an empty map will fail?
There was a problem hiding this comment.
If we only have StandbyTasks assigned, the RecordCollector would not be initialized and thus the KafkaProducer would not initialize transactions and hence the offset commit would fail as we cannot begin a new transaction.
There was a problem hiding this comment.
We set processing guarantee "external" now, via the system test properties file
There was a problem hiding this comment.
Could you elaborate a bit? The comment doesn't seem readable.
There was a problem hiding this comment.
Comment says:
// increase commit interval to make sure a client is killed having an open transaction
If we commit with small commit interval, the probability that there is no pending transaction when we kill the instance is high.
We delay to start a new transaction until we do the first send() and would commit quickly afterwards. If we "stall" in between waiting for new data (what is not uncommon in this test) there will be no open tx for some time.
During debugging I did some segment dumps and could not find a single aborted transaction.
Does this make sense?
There was a problem hiding this comment.
kk, I think I get the motivation, but still feels a client is killed and having an open transaction could not be connected as a full sentence here. Maybe we should call a killed client?
There was a problem hiding this comment.
The previous shutdown hook did not wait until the "main loop" breaks and exits. Hence, the code after the loop was never executed making debugging harder. We introduce the terminated flag to delay the termination of the JVM until the method finished.
There was a problem hiding this comment.
This is the "main loop" as mentioned above
There was a problem hiding this comment.
This is the code after the "main loop" that was never executed
There was a problem hiding this comment.
We use a try-catch to set the flag to make sure the shutdown hook can exit quickly even in case of failure
There was a problem hiding this comment.
To make the verification step work, we first need to check that all transactions are finished. With EOS-alpha we never had pending transactions that would eventually be aborted by the tx-coordinator, because while we crash some instanced in between the final shutdown phase is always clean. Hence, for eos-alpha all pending transactions would be aborted by initTransaction() calls.
For eos-beta, thread that are killed leave open transaction that will be eventually expired by the tx-coordinator though, as we (also on restart of a thread) would generate a new transactonal.id.
Having no pending transactions is a requirement for the following code to do a correct verification of the result.
There was a problem hiding this comment.
Just increasing the wait time as small side improvement to spin less and reduce the output for debugging.
There was a problem hiding this comment.
Because we do the verification for pending transactions first now, we have one additional record that is not part of the result and that we need to exclude (similar below)
There was a problem hiding this comment.
The logic looks fragile when the partitionRecords is empty. For all -1 cases, we add one more dummy record to the array being checked, or just remove the last element from the derived array so that we could maintain the same verification.
There was a problem hiding this comment.
I'm wondering if we have to produce another record in the verifyAllTransactionFinished; for example, could we just check the end value of the newly added offsets map maintained by producer? If yes then we can remove this extra logic here and below.
There was a problem hiding this comment.
The logic looks fragile when the partitionRecords is empty
partitionRecords would never be empty; it would at least contain the "dummy" record that we wrote previously
(We could of course strip the dummy record somewhere else; I picked this solution because it was the least line of code to be changed.)
for example, could we just check the end value of the newly added offsets map maintained by producer?
Well, we can only maintain this new offset map if we write those dummy records. If we don't write anything, the producer would not put anything into the map but it would stay empty?
(Or do you refer to the offsets map from the producer that write the input data? This would not help because the generate() and verify() methods are not executed in the same JVM -- also, we are interested in pending transaction of repartition and output topic -- for input topics, there are not transaction.)
However, maybe we could use two consumers (and get rid of the producer): one in read_uncommitted mode to get the endOffset and a second one with read_committed mode that also get the end-offsets in a loop. Only if the "read committed" consumer returns the same end-offset as the "read uncommitted" consumer did, we know that there is no pending transaction?
Thoughts?
There was a problem hiding this comment.
Yes I was thinking about the added offsets map from the generate() function -- you're right they would not be shared for the other. Bummer..
I think using two consumer is slightly better than using a producer to write a dummy record.
There was a problem hiding this comment.
As there might be pending transaction, we need to improve the way how we verify that all transaction are finished. For this, we need to remember the offsets of our "topic end marker messages".
There was a problem hiding this comment.
See my question above: could we just rely on the maintained offsets map last values?
There was a problem hiding this comment.
Instead of looking for the end-marker message per content (ie, comparing key and value), we now use the offset (that we now know) to see if we can get the endOffset() as expected in "read_committed" mode.
There was a problem hiding this comment.
seekToEnd() will only reach the end-marker in read-committed mode, if there is no pending transaction.
There was a problem hiding this comment.
Strictly, position should be exactly endMarkerOffset + 1 -- it seems ok to just check for >
There was a problem hiding this comment.
Why we want to relax this check here?
There was a problem hiding this comment.
No reason. I can make it strict, too.
There was a problem hiding this comment.
For debugging purpose, we now track the smallest and largest processed offset, too. This helps to understand which task during which phase processed which part of the data.
There was a problem hiding this comment.
We know set the processing guarantee via the properties file (that allows us to easily parametrize the test)
|
|
|
Build failed with Seems to be fixed via #8447 Can retest after initial reviews. |
98c1db0 to
8ed9648
Compare
|
Retriggered a single system test run as "sanity check" for now: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3898/ |
|
Sanity run passed. Triggered another 20: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3899/ |
|
Unit tests are failing |
| .stream() | ||
| .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) | ||
| .collect(Collectors.toSet()) | ||
| ); |
There was a problem hiding this comment.
If we hit a TaskCorruptedException, we know that only a task in restore mode could be affected and those don't have anything to be committed (their commitNeeded flag should be set to false). Hence, we just commit all non-corrupted tasks. Afterwards we can safely call handleCorruption() (if we don't commit, we might abort a pending transaction for eos-beta incorrectly within handleCorruption())
|
Triggered another 20 runs: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3903/ |
| } | ||
|
|
||
| private int commitInternal(final Collection<Task> tasks) { | ||
| int commitInternal(final Collection<Task> tasks) { |
There was a problem hiding this comment.
Should we consider adding a unit test here, since this call is externalized?
There was a problem hiding this comment.
Good call.
I also need to add a unit test that we actually commit all other tasks if a TaskCorruptedException is thrown. Just wanted to get the suggested fix reviewed first (also tested via system test run) before I close the unit test gaps.
|
I think under eos-beta, if one task failed fatally then we do have to close all tasks since their shared producer. It is just a special case for TaskCorrupted since we know it could only be thrown from a restoring task and hence that has nothing to commit. |
Thinking about this a bit more, I think the general rule would be: if the failing task is in the RUNNING state, then it is possible that it has already used the shared producer to send some data, which needs to be aborted; hence we have no other choice but to abort other tasks as collateral damage. If the failing task is in other states (e.g. only RESTORING tasks could throw TaskCorruptedException today) then we know that task has not used the shared producer, and hence we can skip aborting the txn. Does that make sense? @mjsax |
|
@guozhangwang Both your statements make sense. I just have the impression that the logic to follow those patterns is scattered throughout the code base. atm Hence, I would suggest to put this logic into a single place, ie, the I hope that the current fix is "good enough" for now to move forward with this PR. |
|
LGTM! |
|
System test branch builder is buggy atm -- just pushed a fix that should make it work -- we might need to revert that fix before merging. Next try for system tests (20 runs): https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3905/ |
This reverts commit d506697.
|
Java 8 passed. Retest this please. |
Call for review @abbccdda @guozhangwang
System test run passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3886/