[KAFKA-7994] Improve Stream time accuracy for restarts and rebalances #6694
[KAFKA-7994] Improve Stream time accuracy for restarts and rebalances #6694mjsax merged 80 commits intoapache:trunkfrom ConcurrencyPractitioner:kafka-7994
Conversation
|
ping @mjsax @guozhangwang @vvcephei for review |
| @SuppressWarnings("unchecked") | ||
| public boolean process() { | ||
| // if condition put here in case of restarts and rebalances to check for correct timestamp | ||
| if (recordInfo.queue() != null && partitionGroup.getPartitionTimestamp(recordInfo.partition()) == -1) { |
There was a problem hiding this comment.
nit: use RecordQueue.UNKNOWN instead of -1
There was a problem hiding this comment.
No problem, could fix that.
|
Thanks for the PR! I agree this seems like a straightforward patch but I'm wondering if we shouldn't try and think through the eos case a bit more? Or is there really no way to safely cover it as well? |
|
Hi @ableegoldman Thanks for reviewing! I was planning on attacking the eos case. But as you could guess from the code, trying to retrieve the metadata committed is not as simple as in the non eos case. I was hoping for some input on that. So some small amount of advice is greatly appreciated. :) |
|
Oh, just found out something. Regardless if it is eos case or not, calling |
|
That sounds reasonable. Looks like the build failed on checkstyle, can you try running it? +1 on adding a test case(s) |
|
|
||
| // confirm that timestamp was correctly committed | ||
| assertTrue(Long.parseLong(task.consumer.committed(partition1).metadata()) | ||
| == task.getPartitionTime(partition1)); |
There was a problem hiding this comment.
Can we add separate unit tests to confirm this produces the expected behavior? I think the JIRA had some examples highlighting why this is a problem, it would be good to convert those into tests to make sure we're really fixing the problem at hand :)
There was a problem hiding this comment.
No problem. Added a new test case to confirm behavior.
|
Alright, done. @mjsax Added a test case as well. Would be good if you could take a look. :) |
|
pinging @mjsax and @guozhangwang for review |
|
Oh sorry, my bad. Underestimated the scope of the PR. Sorry for pinging you guys. Will dig some more. |
|
Retest this please. |
|
pinging @mjsax @ableegoldman @abbccdda @guozhangwang for final review. |
cadonna
left a comment
There was a problem hiding this comment.
Thank you for the PR @ConcurrencyPractitioner!
Here my comments:
| import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; | ||
| import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; | ||
| import static org.hamcrest.CoreMatchers.is; | ||
| import static org.hamcrest.MatcherAssert.assertThat; |
There was a problem hiding this comment.
The order of the imports in Kafka Streams is usually as follows:
Kafka imports and 3rd-party imports in one block
a block of java.* imports
import static.
| task.commit(); | ||
| assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(partition1).metadata())); | ||
| // reset times here to artificially represent a restart | ||
| task.resetTimes(); |
There was a problem hiding this comment.
Wouldn't creating a new task be better? AFAIK, that is what happens during a restart. No need to simulate anything. Furthermore, it avoids introducing a new method just for testing.
| // extract the committed metadata from MockProducer | ||
| final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> metadataList = | ||
| producer.consumerGroupOffsetsHistory(); | ||
| final String storedMetadata = metadataList.get(0).get("stream-task-test").get(partition1).metadata(); |
There was a problem hiding this comment.
Would be good to extract "stream-task-test" to a member field of the test and use it in createConfig() and here.
| consumer.commitSync(offsetMap); | ||
|
|
||
| // reset times here to artificially represent a restart | ||
| task.resetTimes(); |
| } | ||
|
|
||
| // visible for testing | ||
| String encodeTimestamp(final long partitionTime) { |
There was a problem hiding this comment.
I would put methods to write and read record metadata in their own classes. Those classes would be kind of SerDes for metadata. Such SerDes would make the code better testable and separates the concerns of a task and reading and writing metadata which are completely independent. It does not need to be done in this PR. I just wanted to mention it.
There was a problem hiding this comment.
Yeah, that would probably be a good idea in the future.
| EasyMock.expect(t2.partitions()).andReturn(t2partitions); | ||
| EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.emptyList()); | ||
|
|
||
| t1.initializeTaskTime(); |
There was a problem hiding this comment.
Please remove empty line before this line.
| assertThrows(errMessage, NullPointerException.class, () -> { | ||
| group.setPartitionTime(randomPartition, 0L); | ||
| }); | ||
| } |
There was a problem hiding this comment.
This tests misses to verify whether streamTime is set or not.
Furthermore, I would write two (or three) distinct tests:
partitionTimestampis set (could be further split forstreamTimeis set or not)NullPointerExceptionis thrown
| task.addRecords(partition1, singletonList(getConsumerRecord(partition1, DEFAULT_TIMESTAMP))); | ||
|
|
||
| task.process(); | ||
| task.commit(); |
There was a problem hiding this comment.
The code block from the beginning of the method until here can be extracted and re-used in this and the previous test methods.
| } | ||
|
|
||
| @Test | ||
| public void testSetPartitionTimestamp() { |
There was a problem hiding this comment.
I think, we use should... for newly added test methods.
|
@cadonna Alright, done. |
|
@mjsax @abbccdda @guozhangwang @ableegoldman |
|
This issue is the cause of critical bugs we recently faced up in our applications that rely on the @mjsax do you think this fix can be included as part of 2.3.1? |
|
@mjsax pinging. |
|
@marcospassos I don't think that we will include it in 2.3.1 -- it's not really a bug fix but an improvement. @ConcurrencyPractitioner I try to review again in the next days. |
cadonna
left a comment
There was a problem hiding this comment.
@ConcurrencyPractitioner Sorry for the delay.
| if (partitionQueues.get(partition) == null) { | ||
| throw new NullPointerException("Partition " + partition + " not found."); | ||
| } | ||
| return partitionQueues.get(partition).partitionTime(); |
There was a problem hiding this comment.
Here it would be better to call partitionQueues.get(partition) only once and store its result in a variable. Then check the variable for null and call partitionTime() on the variable.
| if (streamTime < partitionTime) { | ||
| streamTime = partitionTime; | ||
| } | ||
| partitionQueues.get(partition).setPartitionTime(partitionTime); |
| final ByteBuffer buffer = ByteBuffer.allocate(9); | ||
| buffer.put(LATEST_MAGIC_BYTE); | ||
| buffer.putLong(partitionTime); | ||
| return Base64.getEncoder().encodeToString(buffer.array()); |
There was a problem hiding this comment.
I am wondering whether we can do better here. Encoding partition time in Base64 seems to me a bit a waste of space. As far as I can see, a 8 byte value is encoded in 11 bytes with Base64. Would be great, if we could store partition time in 8 bytes.
I am also wondering why metadata in OffsetAndMetadata is a String and not something more bytes friendly.
There was a problem hiding this comment.
@cadonna Yeah, it is still unclear at this point if the metadata field in OffsetAndMetadata could be used in this manner. @guozhangwang or @hachikuji knows this matter better. Anyhow, OffsetAndMetadata right now is the only medium through which we can checkpoint partition time anyways. So we might be stuck with using the metadata field.
There was a problem hiding this comment.
I don't have the full context on the history, but it would not be easy to change the API... I talked to Jason about it, and it seem we can just move forward with this PR as-is, and could do a KIP later that allows us to store metadata as byte[] type if we really need to change it. Atm, the metadata is just a few bytes and the overhead does not really matter IMHO.
|
Retest this please. |
| */ | ||
| // visible for testing | ||
| void commit(final boolean startNewTransaction) { | ||
| void commit(final boolean startNewTransaction, final Map<TopicPartition, Long> partitionTimes) { |
There was a problem hiding this comment.
I know that I recommended to add this parameter, but now, after more refactoring of the code, I am not sure any longer why we need it? It seems that this method is called twice and both calls pass in the result of extractPartitionTimes() as parameter -- hence, it seems we can remove the parameter and do the call to extractPartitionTimes() within the method itself?
| } | ||
|
|
||
| private void initializeCommittedTimestamp(final TopicPartition partition) { | ||
| final OffsetAndMetadata metadata = consumer.committed(partition); |
There was a problem hiding this comment.
This is a blocking call, and @guozhangwang just proposed KIP-520 to make it more efficient by allowing to pass in multiple partitions at once. Should we wait for KIP-520 to be implemented? If now, we should make sure the update this code after KIP-520 is merged.
I am also wondering how we should handle TimeoutException for this call? Maybe not, but might be worth to clarify?
\cc @guozhangwang
There was a problem hiding this comment.
In my PR (#7304) I've refactored this part in StreamTask. I'd suggest we merge that one before this.
There was a problem hiding this comment.
Just realized I need to do another rebase on my PR. So if this PR is closer to be merged I'd suggest @RichardYuSTUG @mjsax you guys just move forward and I will rebase mine later.
There was a problem hiding this comment.
@mjsax Cool, sounds good. In that case, we could get this one merged since it is about complete.
| final ByteBuffer buffer = ByteBuffer.allocate(9); | ||
| buffer.put(LATEST_MAGIC_BYTE); | ||
| buffer.putLong(partitionTime); | ||
| return Base64.getEncoder().encodeToString(buffer.array()); |
There was a problem hiding this comment.
I don't have the full context on the history, but it would not be easy to change the API... I talked to Jason about it, and it seem we can just move forward with this PR as-is, and could do a KIP later that allows us to store metadata as byte[] type if we really need to change it. Atm, the metadata is just a few bytes and the overhead does not really matter IMHO.
|
@ConcurrencyPractitioner All builds reported SpotBug issues. |
|
Yeah, got it fixed. |
|
The following test failures seem related: |
|
@cadonna Oh, just realized that @mjsax's comment caused a regression. If you would look earlier in the conversation, you would find a segment where a call to close() resets all partition times to negative one. Therefore, we need to store the partition times in a map before they are reset, and then they are passed into the commit() method. The extra parameter is needed after all due to the order of operations in close(). We will need to rollback some changes. |
| // visible for testing | ||
| void suspend(final boolean clean, | ||
| final boolean isZombie) { | ||
| final Map<TopicPartition, Long> partitionTimes = extractPartitionTimes(); |
There was a problem hiding this comment.
I remember now -- can we add a comment to explain that we need to get partitionTimes before we closeTopology() (sorry for my previous comment -- forgot about that)
There was a problem hiding this comment.
Cool, got it done.
cadonna
left a comment
There was a problem hiding this comment.
LGTM, Thank you @ConcurrencyPractitioner
|
Thanks for the hard work @ConcurrencyPractitioner! |
The issue for this PR could be found here:
https://issues.apache.org/jira/browse/KAFKA-7994?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)
As noted in the JIRA description, stream time is incorrectly set to -1 after rebalances and restarts. To help resolve this issue, one approach is to commit the individual partition time with last message processed for each RecordQueue. Hence, after a restart, we could set the partition time to the last committed partition time.
We would also have to forward timestamps down stream after the head of the DAG receives records that updates stream time and global time.
Committer Checklist (excluded from commit message)