Skip to content

[KAFKA-6730] Simplify state store recovery#4901

Closed
ConcurrencyPractitioner wants to merge 13 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6730
Closed

[KAFKA-6730] Simplify state store recovery#4901
ConcurrencyPractitioner wants to merge 13 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6730

Conversation

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor

No description provided.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax Once the end offsets are fetched, how would you check if the we did fully restored? I still called restorePartition as it is because currently I am not sure how to correctly check the end offsets other than the way provided in restorePartition.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

On another note, I noticed how the check should be performed. So this should be ready for review!

@mjsax mjsax requested review from guozhangwang and mjsax April 20, 2018 08:41
@mjsax mjsax added the streams label Apr 20, 2018
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 20, 2018

\cc @bbejeck @vvcephei

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 20, 2018

@ConcurrencyPractitioner Thanks for the PR -- test failures seem to be related. Can you have a look before we review?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Thanks Matthias. However, I am not to sure how to simplify this any further, more specifically how to avoid the check that a Task has migrated. It will probably take some time.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax I will probably need some help on the logic.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax I have passed the tests.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax This is ready for review.

}

private ConsumerRecords<byte[], byte[]> mergeRecords(Set<ConsumerRecords<byte[], byte[]>> allRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> mergedRecords = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this logic isn't much more complicated than what you're doing to build allRecords in the first place. Maybe just do this inline?

final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
final Set<ConsumerRecords<byte[], byte[]>> allRecords = new HashSet<>();
while (true) {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious; why poll for 10ms in particular?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I looked through the code, it was set to 10 previously. I just set it to this value
to follow the previous versions as much as possible.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question and one nit, otherwise it looks fine to me.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, overall looks good. I have just one minor comment about the code itself.

But I have a meta-comment in the current version restores each batch returned from the poll call, while this proposed approach first collects all records upfront and holds them in memory for the restoration. I'm wondering if this could have an impact on applications with a significant amount of state to restore.

mergedRecords.put(partition, records.records(partition));
}
}
final ConsumerRecords<byte[], byte[]> result = new ConsumerRecords<>(mergedRecords);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need result can return new ConsumerRecords<>(mergedRecords) directly

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

I never thought about the amount of state which is needed to be restored. Thanks for pointing this out.
We could set a maximum amount of records at which point we will stop and then judging the restore.
@mjsax What do you think about this? Currently, polling until no data is returned is not advisable, particularily since the amount of data involved could lead to high latency.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

ConcurrencyPractitioner commented May 2, 2018

Hi, @bbejeck @mjsax . Would an extra config be required if we would want to cap the number of polled records?
I don't think that adding a configuration is necessary, particularly since it only adds complexity to a problem when originally we intended to simplify it.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. Left some comments.

final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
int totalNumberOfRecords = 0;
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> allRecords = new HashMap<>();
while (totalNumberOfRecords < DEFAULT_MAX) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we need DEFAULT_MAX here? Can't we just restore whatever poll(restoreConsumer, 10) returns?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bill pointed out above that we do not want to restore too many records since it would cause too much latency. Also, in KAFKA-6730's description, it said to "to just consuming until poll() does not return any data". However, there is a chance this condition could not be fulfilled, particularly since poll() can continue to return records indefinitely, then we will terminate once we hit this DEFAULT_MAX parameter to ensure that the number of records restored will be relatively small.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner thanks for updating the PR. My point from before was that we should restore each batch of records returned from each poll() call vs. keeping all returned records in memory and start the restore process when there no more records to fetch. Sorry if I did not make that point very clear.

final long pos = processNext(mergedRecords.records(partition), restorer, endOffset);
restorer.setRestoredOffset(pos);
if (restorer.hasCompleted(pos, endOffset)) {
if (pos > endOffset) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the ticket is to actually remove this check.

}
if (restorer.offsetLimit() == Long.MAX_VALUE) {
final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(partition)).get(partition);
if (!restorer.hasCompleted(pos, updatedEndOffset)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also want to remove this check

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @mjsax. When you say to "fetch the end offsets" in the JIRA description, do you do it regularly between callbacks for poll() or are you planning to do it after poll() ceases to return any records? I am currently confused on this point.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

ConcurrencyPractitioner commented May 6, 2018

When I added the first conditional i.e. (if (pos > endOffset)) in the original code to this PR, I managed to pass four of the tests, the only one that failed was shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck. In contrast, when I added the only the second condition to this PR, I managed to get the following for failing tests:

org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled
org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopic

We might consider the need to change the tests, particularly since TaskMigratedException might be used in a different manner.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 7, 2018

@ConcurrencyPractitioner We would call endOffsets() only once. Note, that restoring interleaves with calling poll() on the main consumer. Thus, the overall flow should be:

  • mainConsumer.poll()
  • needsRestore?
    yes -> get endOffset if unknown (we only do this once)
    restoreConsumer.poll()
    restore records
    if reached endOffsets; set "needsRestore" to false

For the tests: because we change the behavior, we also need to update (or maybe even remove) some tests. It's expected that they don't pass as they test for current behavior that we change.

Does this make sense?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax I have updated PR with your help. Thanks!
It should be ready for another round of review.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Some follow up questions/comments.

try {
final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the JavaDoc of this method can be removed, because we don't call restorePartition() anymore and thus not TaskMigratedException can happen. Please backtrack the callers of this method an update JavaDocs accordingly for them, too, if necessary.

while (!needsRestoring.isEmpty()) {
final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 10);
if (records.count() == 0) {
break;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if restore is completed for some partitions? I think, with EOS and commit markers, there is a corner case that the check below does not detect that restore is complete even if we fetched all data (but not the final commit marker). For this case, records.count() could be zero but the actual position() for a partitions was advanced by 1 to step over the commit marker.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too clear on this point. Do you have something specific in mind?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the same check as in https://github.com/apache/kafka/pull/4901/files#diff-46ed6d177221c8778965ecb1b6657be3R101

(might be good to extract this into a private method)

We should cover this corner case with a unit test, too. As the tests pass atm, it seems the corner case is not covered yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To summarize on what this check does, I have found that if this check is removed (if records.count()==0) then hanging tests will result since needsRestoring might still contain partitions even if poll() no longer returns any records. However, when I dug through the older version of the code, I could not find the check you are referring to. My closest guess is that you are thinking about comparing endOffsets.get(partition) to the restored offsets for that particular partition. (note, it is not the updatedEndOffsets field which is strictly used only in restore). Is this what you mean? Currently, my understanding is sketchy at best.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not expressing my thoughts clearly.

The check if (records.count()==0) if fine. However, I think just doing a break is not good enough. Before the break we need to check if restore has completed for any partitions that is in restore phase -- and if yes, remove those partitions from needsRestore etc. Otherwise, it might happen that a partition stays in "restoring" phase forever, because count()==0 is always zero and we never to the check in line 101.

Does this make sense?

final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
final Map<TopicPartition, Long> endOffsets = restoreConsumer.endOffsets(restoringPartitions);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering, if this should be a class member that is update once at "restore begin" but not each time we check for the next "batch of records" ?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @mjsax, this should be able to resolve the error. I just moved the for() loop to the front of
the if loop, such that any partitions that have been completed will be removed prior to exiting from the outer while loop. In this manner, I think we can avoid leaving partitions which have finished restoring in the needsRestoring map field.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @mjsax Could you review? This PR is almost ready.

final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
updatedEndOffsets = !needsRestoring.isEmpty() ?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would update updateEndOffsets multiple time -- should we set it only once? Note, that needsRestoring will not be empty until the full restore is completed.

restorePartition(allRecords, partition, active.restoringTaskFor(partition));
updatedEndOffsets = !needsRestoring.isEmpty() ?
restoreConsumer.endOffsets(restoringPartitions) : updatedEndOffsets;
while (!needsRestoring.isEmpty()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to do it this way, because, we want restore and calling mainCosumer.poll() to interleave -- otherwise, we might drop out of the consumer group as restore is expected to take longer than max.poll.intervall.ms. Hence, within this method, we should only do a single poll(restoreCosumer, 10) an return afterwards -- the main loop will make sure that this method will be called again to resume the restore.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@mjsax In local, when I ran StoreChangelogReaderTest, a single test didn't pass: shouldRestorePartitionsRegisteredPostInitialization. Upon further inspection, I discovered that end offsets were updated twice in this particular. Thinking this through, I think that updatedEndOffsets should continue retrieve new end offsets to account for this corner case. What do you think?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Migrating to new PR, current branch is too old.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 16, 2018

Replaced by #5013

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants