Skip to content

KAFKA-7961: Ignore assignment for un-subscribed partitions#6304

Merged
hachikuji merged 6 commits intoapache:trunkfrom
jsancio:kafka-7961
Feb 23, 2019
Merged

KAFKA-7961: Ignore assignment for un-subscribed partitions#6304
hachikuji merged 6 commits intoapache:trunkfrom
jsancio:kafka-7961

Conversation

@jsancio
Copy link
Copy Markdown
Member

@jsancio jsancio commented Feb 22, 2019

Whenever the consumer coordinator sends a response that doesn't match the client consumer subscription, ignore the assignment and rejoin the group.

Testing strategy: create a mocked client that first sends an assignment response that doesn't match the client subscription followed by an assignment response that does match the client subscription.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Added "testInvalidCoordinatorAssignment" test wich result in the
following failure:

```
org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest > testInvalidCoordinatorAssignment FAILED
    java.lang.IllegalArgumentException: Assigned partition test2-0 for non-subscribed topic; subscription is [test1]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:249)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:340)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.testInvalidCoordinatorAssignment(ConsumerCoordinatorTest.java:462)

1 test completed, 1 failed
```
@jsancio
Copy link
Copy Markdown
Member Author

jsancio commented Feb 22, 2019

cc @guozhangwang @ijuma @hachikuji

@hachikuji hachikuji self-assigned this Feb 22, 2019
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up. Looks good overall. Left a few comments.

Throw an IllegalStateException if the leader sent and assignment that
doesn't match for the subscription requested.

Modify testInvalidCoordinatorAssignment and add
testOutdatedCoordinatorAssignmetn tests for cover this condition.
Copy link
Copy Markdown
Member Author

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Here are the failures when we run these tests against the old code.

}

@Test(expected = IllegalStateException.class)
public void testInvalidCoordinatorAssignment() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test, the old code fails as follow:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest > testInvalidCoordinatorAssignment FAILED
    java.lang.Exception: Unexpected exception, expected<java.lang.IllegalStateException> but was<java.lang.IllegalArgumentException>

        Caused by:
        java.lang.IllegalArgumentException: Assigned partition test2-0 for non-subscribed topic; subscription is [test1]
            at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:249)
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:340)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.testInvalidCoordinatorAssignment(ConsumerCoordinatorTest.java:506)

}

@Test
public void testOutdatedCoordinatorAssignment() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test the old code fails as follow:

org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest > testOutdatedCoordinatorAssignment FAILED
    java.lang.IllegalArgumentException: Assigned partition test2-0 for non-subscribed topic; subscription is [test1]
        at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:249)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:340)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.testOutdatedCoordinatorAssignment(ConsumerCoordinatorTest.java:466)

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the fix looks good. Just had a couple small comments.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit eb8cc09 into apache:trunk Feb 23, 2019
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 23, 2019

Should this be cherry-picked to 2.2?

@jsancio
Copy link
Copy Markdown
Member Author

jsancio commented Feb 25, 2019

@ijuma I don't think so. This was found during internal testing by waking up the consumer and while also changing the subscription.

@hachikuji What do you think?

@jsancio jsancio deleted the kafka-7961 branch February 25, 2019 17:59
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk: (36 commits)
  KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308)
  Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330)
  KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327)
  MINOR: fix parameter naming (apache#6316)
  KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218)
  MINOR: Refactor replica log dir fetching for improved logging (apache#6313)
  [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227)
  MINOR: Increase produce timeout to 120 seconds (apache#6326)
  KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293)
  MINOR: Fix line break issue in upgrade notes (apache#6320)
  KAFKA-7972: Use automatic RPC generation in SaslHandshake
  MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310)
  KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312)
  KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311)
  MINOR: Update docs to say 2.2 (apache#6315)
  KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115)
  KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304)
  KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113)
  KAFKA-7864; validate partitions are 0-based (apache#6246)
  KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Whenever the consumer coordinator sends a response that doesn't match the client consumer subscription, we should check the subscription to see if it has changed. If it has, we can ignore the assignment and request a rebalance. Otherwise, we can throw an exception as before.

Testing strategy: create a mocked client that first sends an assignment response that doesn't match the client subscription followed by an assignment response that does match the client subscription.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants