Skip to content

KAFKA-9545: Fix subscription bugs from Stream refactoring#8109

Merged
guozhangwang merged 7 commits intoapache:trunkfrom
abbccdda:KAFKA-9545
Feb 16, 2020
Merged

KAFKA-9545: Fix subscription bugs from Stream refactoring#8109
guozhangwang merged 7 commits intoapache:trunkfrom
abbccdda:KAFKA-9545

Conversation

@abbccdda
Copy link
Copy Markdown

@abbccdda abbccdda commented Feb 13, 2020

This PR fixes two bugs related to stream refactoring:

  1. The subscribed topics are not updated correctly when topic gets removed from broker.
  2. The remainingPartitions computation doesn't account the case when one task has a pattern subscription of multiple topics. Then the input partition change will not be assumed as containsAll

The bugs are exposed from integration test testRegexMatchesTopicsAWhenDeleted and could be used to verify the fix works.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman could you take a look too?

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

if (remainingPartitions.containsAll(task.inputPartitions())) {
revokedTasks.add(task.id());
remainingPartitions.removeAll(task.inputPartitions());
for (final TopicPartition topicPartition : task.inputPartitions()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a bit on this fix? Do you mean one task is subscribed to a pattern that matches multiple topics, or to two different patterns? I'm not sure I understand why either case would cause containsAll to fail.

Copy link
Copy Markdown
Author

@abbccdda abbccdda Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean here is that if one of the partitions this task owns is removed, but not all, then the old logic will not catch the revoked task by containsAll.

java.lang.IllegalStateException: Some revoked partitions that do not belong to any tasks remain: [TEST-TOPIC-A-0]
	at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:267)
	at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:72)
	at org.apache.kafka.streams.integration.RegexSourceIntegrationTest$TheConsumerRebalanceListener.onPartitionsRevoked(RegexSourceIntegrationTest.java:422)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:681)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:414)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:477)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)``` 

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. why it's possible that only part of the partitions of a task is revoked? We do assignment at the granularity of tasks so this check is to verify specifically that all partitions should be included if a task is going to be removed right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think now I understand the reason that part of the partitions can be removed here.

If a task is indeed being removed, it should be triggered in the TaskManager#onAssignment; here TaskManager#onRevocation is triggered after the previous call so the tasks map should have been updated --- i.e. the task would not be inside task-manager anymore, and if it is due to regex pattern (like this test) the current condition is okay: we should not suspend the task unless all its input-partitions are included. Otherwise, we can just update the input partition of that task --- right now it is final so we cannot update it, but I think that’s fine since we would no longer pipe any records to that task and there will be no committed offsets for that partition either --- in either case, we can remove it from the remainingPartitions.

final Collection<String> existingTopics = subscriptionUpdates();

if (!existingTopics.equals(topics)) {
topics.addAll(existingTopics);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is called by two callers: xxxFromMetadata and xxxFromAssignment. For the former we do not maintain the old topics but just replace with the passed in value, for the latter we still maintain the old topics -- this it to take care if the leader did not assign all tasks / partitions due to assignment error. We should still keep that logic here.

if (remainingPartitions.containsAll(task.inputPartitions())) {
revokedTasks.add(task.id());
remainingPartitions.removeAll(task.inputPartitions());
for (final TopicPartition topicPartition : task.inputPartitions()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. why it's possible that only part of the partitions of a task is revoked? We do assignment at the granularity of tasks so this check is to verify specifically that all partitions should be included if a task is going to be removed right?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Feb 14, 2020

Ok to test

@abbccdda
Copy link
Copy Markdown
Author

With some offline discussions, we believe that we should be able to resolve trunk integration test failure first, and fix the task partition inconsistency issue in a separate JIRA

@abbccdda abbccdda force-pushed the KAFKA-9545 branch 3 times, most recently from d65bff2 to 2728fd8 Compare February 15, 2020 01:03
@guozhangwang
Copy link
Copy Markdown
Contributor

test this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code LGTM, waiting for test to complete.

@guozhangwang guozhangwang merged commit 97c5dc1 into apache:trunk Feb 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants