Skip to content

KAFKA-5520: KIP-171 - Extend Consumer Group Reset Offset for Stream Application - Updated#4159

Closed
jeqo wants to merge 104 commits intoapache:trunkfrom
jeqo:feature/kip-171
Closed

KAFKA-5520: KIP-171 - Extend Consumer Group Reset Offset for Stream Application - Updated#4159
jeqo wants to merge 104 commits intoapache:trunkfrom
jeqo:feature/kip-171

Conversation

@jeqo
Copy link
Copy Markdown
Contributor

@jeqo jeqo commented Oct 30, 2017

@jeqo jeqo changed the title KAFKA-5520: KIP-171 - Extend Consumer Group Reset Offset for Stream Application - Merge with KIP-198 KAFKA-5520: KIP-171 - Extend Consumer Group Reset Offset for Stream Application - Updated Nov 2, 2017
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. Did not yet have a look at the test (it seems, there is a lot of redundancy in the test though -- could we refactor to share more code?)

}
}

public Long getDateTime(String ts) throws ParseException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be package private? (add comment // visible for testing)

nit: ts -> timestamp

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but test is in another package/module. Maybe once it is moved to streams we can improve visibility of this method. I will add the comment.

}

public Long getDateTime(String ts) throws ParseException {
if (!(ts.split("T")[1].contains("+") || ts.split("T")[1].contains("-") || ts.split("T")[1].contains("Z"))) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could throw a NPE. I think we should guard against this, and throw ParseException if NPE happens

ts.split("T") is redundant and should be extracted into a variable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I will reformat this.

Date date;
try {
date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(ts);
} catch (ParseException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I'll reformat.

}

@Test
public void testDateTimeFormats() throws ParseException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be multiple tests (one test method per scenario). Also, use better method names, eg, shouldAcceptValidDateFormats, shouldThrowOnInvalidDateFormat etc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add them.

try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
fail("Call to getDateTime should fail");
} catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final (same below)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure where the final should be. Maybe you're referring to the next comment?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} catch (final Exception e) {

}
}

private void invokeGetDateTimeMethod(SimpleDateFormat format) throws ParseException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

final Map<TopicPartition, Long> topicPartitionAndOffset = new HashMap<>();
for (final String line : resetPlanCsv.split("\n")) {
final String[] parts = line.split(",");
final String topic = parts[0];
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we guard against NPE? (same blow)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I'll refactor.

return date.getTime();
}

private Map<TopicPartition, Long> parseResetPlan(String resetPlanCsv) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

Map<TopicPartition, Long> beginningOffsets,
Map<TopicPartition, Long> endOffsets) {
Map<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<>();
for (final TopicPartition topicPartition : inputTopicPartitionsAndOffset.keySet()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: As key and value is accessed, we might want to iterate throw the .entrySet instead of keySet

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.

return topicPartitionAndOffset;
}

private Map<TopicPartition, Long> checkOffsetRange(Map<TopicPartition, Long> inputTopicPartitionsAndOffset,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final (3x)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

2 similar comments
@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

2 similar comments
@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

FAILURE
8076 tests run, 5 skipped, 2 failed.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

SUCCESS
8076 tests run, 5 skipped, 0 failed.
--none--

1 similar comment
@asfgit
Copy link
Copy Markdown

asfgit commented Nov 14, 2017

SUCCESS
8076 tests run, 5 skipped, 0 failed.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 15, 2017

FAILURE
8076 tests run, 5 skipped, 2 failed.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 15, 2017

FAILURE
7959 tests run, 5 skipped, 1 failed.
--none--

@asfgit
Copy link
Copy Markdown

asfgit commented Nov 15, 2017

SUCCESS
8076 tests run, 5 skipped, 0 failed.
--none--

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 15, 2017

@jeqo We just merged a fix for the failing tests. Can you please rebase. thx.

…up rebalances in progress

…up rebalances in progress

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes apache#3506 from cmccabe/KAFKA-5565
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Nov 20, 2017

Thanks for the PR, looks good overall just one minor comment.

The maybeReset method is long; maybe we could put the code in if statements over 2 or 3 lines in separate methods for readability?

@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Nov 21, 2017

@bbejeck agree. I've fix it.
@guozhangwang I will give a try and check how it goes.

Thanks for your feedback!

@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Dec 2, 2017

@guozhangwang I've added StreamsResetterTest to unit-test seek and removeRecords functionality, and about integration tests, I have add methods to test it with SSL.
Let me know if there's anything missing to merge :)

@guozhangwang
Copy link
Copy Markdown
Contributor

@jeqo The unit tests of StreamsResetterTest lgtm. For the integration test though, my previous comment is that we do not need to add more under integration tests as they should be well covered. LMK.

@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Dec 4, 2017

@guozhangwang I've removed SSL integration test, and put integration tests in one class. how does it look now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo Maybe my previous comment was misleading :) I was suggesting that we do not need to add more test cases into the reset integration tests like "from offset", "from date" etc, as for integration test we just check its interaction with the cluster module all worked as expected, so having just one test for "from scratch with intermediate" and "from scratch without intermediate" is good enough. For the added functionality our added unit tests should be well covering it.

Also it seems you have removed the test with SSL config as well? Is that intentional? If not maybe we can just make this integration test a parameterized test, with ssl turned on and off (i.e. when it is turned on broker cluster will have the SSL configs to not accept unauthorized access, and the streams resetter needs the appropriate SSL configs to be able to delete those topics).

cc @mjsax

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it know: I will return tests with SSL config (I thought that was what it was already tested) and remove tests that are already validated by unit-test. There are some methods from KafkaConsumer that are not available on unit test like offsetsForTimes, I will keep those on integration test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Thanks. The reason we'd like to avoid redundant integration tests with overlapping unit tests is that they would take much more time. If we are not careful the Jenkins builds would take longer and longer to finish, and hence hinders our development pace (we have seen the Streams unit test suite taking from 1 min to about 7 min now on my laptop, and much longer on the Jenkins box).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make total sense 👍

"--input-topics", INPUT_TOPIC
};
}
"--intermediate-topics", INTERMEDIATE_USER_TOPIC));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add the intermediate topic for all cases, I am wondering if this is correct.

I understand that the test is still passing and we just log an error, but should StreamsResetter not return a non-zero return value to indicate that something went wrong?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Maybe we should still add a boolean parameter to indicate if we should include --intermediate-topics", INTERMEDIATE_USER_TOPIC into the config lists?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. And we should make sure that the test fails if we do it wrong. That the test passes atm, indicates an issue in the reset tool. It only returns non-zero exit code if there was an fatal exception and we crash. But we should also return non-zero if anything else went wrong. Atm, we catch, print/log and move on -- we should remember if any error occurred though and only return zero if no error occurred at all.

I would recommend to use different return codes for different errors and document it. Should be part of the public API IMHO.

And add a test to check the return codes :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a separate PR out of the scope of the KIP though?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. It's a bug and we can file a JIRA -- thus, the bug fix can also re-introduce the it-then-else for the intermediate topic here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo do you want to add back the boolean indicator whether intermediate topic exists and should be removed before we merge this PR?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 5, 2017

Thanks for the updates. Still wondering if we should share more code for the individual test methods. Do not insist on if, but might be worth. WDYT @guozhangwang ?

@guozhangwang
Copy link
Copy Markdown
Contributor

I looked at the StreamsResetterTest and I think it is well readable, while sharing common code may be less effective, so if you are referring to this test class I actually think it is good as is.

@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Dec 6, 2017

@mjsax we could go back to private methods once we move StreamsResetter to streams module (i.e. merge KIP-222), so it should be a temporary solution to unit-test.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 6, 2017

I just looked into StreamsResetter. Wondering why the test passed without the boolean flag we just added back. Shouldn't we throw here and die if there is no intermediate topic to be deleted, and finally return EXIT_CODE_ERROR ?

    private void doDelete(final List<String> topicsToDelete,
                          final KafkaAdminClient adminClient) {
        boolean hasDeleteErrors = false;
        final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
        final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();

        for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet()) {
            try {
                entry.getValue().get(30, TimeUnit.SECONDS);
            } catch (Exception e) {
                System.err.println("ERROR: deleting topic " + entry.getKey());
                e.printStackTrace(System.err);
                hasDeleteErrors = true;
            }
        }
        if (hasDeleteErrors) {
            throw new RuntimeException("Encountered an error deleting one or more topics");
        }
    }

@jeqo
Copy link
Copy Markdown
Contributor Author

jeqo commented Dec 6, 2017

But doDelete is to delete internal topics, not intermediate ones. Intermediates are suppose to move offsets to the end with maybeResetInputAndSeekToEndIntermediateTopicOffsets

@guozhangwang
Copy link
Copy Markdown
Contributor

guozhangwang commented Dec 6, 2017

The reason that it does not fail is because in the resetter, we use a client to subscribe to all the topics, and then populate the intermediate topics:

client.subscribe(topicsToSubscribe);
            client.poll(1);

            final Set<TopicPartition> partitions = client.assignment();
            final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
            final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();

            for (final TopicPartition p : partitions) {
                final String topic = p.topic();
                if (isInputTopic(topic)) {
                    inputTopicPartitions.add(p);
                } else if (isIntermediateTopic(topic)) {
                    intermediateTopicPartitions.add(p);
                } else {
                    System.err.println("Skipping invalid partition: " + p);
                }
            }

If the intermediate topic does not exist, then the intermediateTopicPartitions would be empty, and hence maybeSeekToEnd will be a no-op, so no exceptions will be thrown.

Synced with @mjsax offline, and he will file a JIRA to improve on this situation separately.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 6, 2017

@jeqo You are right -- I did look at the wrong code. My bad. Makes sense that the test passed.

Did create https://issues.apache.org/jira/browse/KAFKA-6318 as follow up.

This is the issue (unrelated to the KIP):

            if (notFoundIntermediateTopics.size() > 0) {
                System.out.println("Following intermediate topics are not found, skipping them");
                for (final String topic : notFoundIntermediateTopics) {
                    System.out.println("Topic:" + topic);
                }
            }

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the KIP and PR! Great addition!

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. Merged to trunk.

@jeqo Thanks for your contribution!

@asfgit asfgit closed this in 30f08d1 Dec 6, 2017
if (!dryRun) {
client.seekToEnd(intermediateTopicPartitions);

client.seekToEnd(intermediateTopicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call be governed by !dryRun

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final Set<TopicPartition> inputTopicPartitions) {
// visible for testing
public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
final Map<TopicPartition, Long> endOffsets = client.endOffsets(inputTopicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two calls boils down to

Fetcher#retrieveOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch ...)

I wonder if they can be combined (to save roundtrips).

Copy link
Copy Markdown
Member

@mjsax mjsax Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to get endOffsets() and beginningOffsets for the same set of partitions. A single request cannot get both at once AFAIK.

Also, the reset tool is not considered to be on the "hot code path" -- thus, we don't need to worry about performance too much and apply (unnecessary?) micro optimizations. Just my two cents here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that such optimization is not needed for this task.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 3, 2018

@jeqo I just realized, that we should have updated the docs with this PR. Would you mind to do a follow up PR? Should go here https://kafka.apache.org/10/documentation/streams/upgrade-guide and https://kafka.apache.org/10/documentation/streams/developer-guide/app-reset-tool.html

@jeqo jeqo deleted the feature/kip-171 branch August 8, 2020 09:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.