Skip to content

KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289)#5877

Merged
hachikuji merged 9 commits intoapache:trunkfrom
vahidhashemian:KAFKA-6774
Nov 16, 2018
Merged

KAFKA-6774: Improve the default group id behavior in KafkaConsumer (KIP-289)#5877
hachikuji merged 9 commits intoapache:trunkfrom
vahidhashemian:KAFKA-6774

Conversation

@vahidhashemian
Copy link
Copy Markdown
Contributor

Improve the default group id behavior by

  • changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
  • deprecate the use of empty ("") consumer group on the client

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vahidhashemian vahidhashemian force-pushed the KAFKA-6774 branch 2 times, most recently from 0b0dec0 to 73c9ba1 Compare November 4, 2018 07:03
@hachikuji hachikuji self-assigned this Nov 6, 2018
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian Thanks for the PR. I think we also might need to hook into the logic for setting the initial position of the consumer. By default, we attempt to fetch committed offsets, but if the group id is null, we should just skip this step.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps group management or the the offset commit api?

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@hachikuji Thanks for reviewing the PR. Submitted a patch to address your feedback.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple more comments. Do we have an integration test that covers basic fetching using a null group id? We should probably make sure we have at least one case which uses null and one which uses the empty string.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to change the default as long as the user hasn't explicitly provided the configuration. However, if they have explicitly set enable.auto.commit=true, then it might be better to raise an exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a fail below if this returns without errors?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Totally missed them.

Copy link
Copy Markdown
Contributor Author

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I tried to address your recent comments in the new commit. Thanks for the additional feedback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Totally missed them.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tests. Left a few minor comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to just pass "groupId=null" as an override?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem so, I get a NPE:

java.lang.NullPointerException
	at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1021)
	at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1016)
	at java.base/java.util.Properties.put(Properties.java:1309)
	at kafka.api.PlaintextConsumerTest.testConsumingWithNullGroupId(PlaintextConsumerTest.scala:1838)
        ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could leave this out?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems more accurate to say that consumer 3 begins from an explicit seek?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe better to turn this into explicit equals assertions so that we know which check failed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third iteration here feels redundant. Doesn't it just read from the committed offset like consumer2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I put it to verify the auto.offset.reset doesn't affect the behavior. I'll remove it in the new commit.

Copy link
Copy Markdown
Contributor Author

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I updated the unit tests based on your comments. Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem so, I get a NPE:

java.lang.NullPointerException
	at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1021)
	at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1016)
	at java.base/java.util.Properties.put(Properties.java:1309)
	at kafka.api.PlaintextConsumerTest.testConsumingWithNullGroupId(PlaintextConsumerTest.scala:1838)
        ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I put it to verify the auto.offset.reset doesn't affect the behavior. I'll remove it in the new commit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian Thanks for the patch. Looks good overall. I had just one thought. When the groupId is null, there should be no coordinator interaction at all. We could make this explicit by not bothering to construct the ConsumerCoordinator object. That would take it out of the poll path and reduce the risk of using it accidentally. What do you think?

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

@hachikuji thanks for the feedback. Your suggestion makes sense. Even though it's not explicitly mentioned in the KIP, I think it wouldn't result in a different behavior that what is covered by the KIP. I submitted a patch that should cover it (it did not break any test locally), but please let me know if I missed something. Thanks!

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, just a few additional comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps now we don't need this? We could add an assertion in the constructor that the groupId is not null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I added an assertion in AbstractCoordinator constructor. It seemed to be a better fit there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if there's a strong reason for any of these to use the empty group id. Could we use groupId instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason other than I wanted to stick to the actual group id that was used before. I updated these to use the provided groupId.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we could rephrase this to suggest what the user should do more clearly. For example, "To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to complete the integration picture, could we make a call to committed and assert the exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that in the next commit.

Copy link
Copy Markdown
Contributor Author

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji thanks for the feedback/suggestions. The new commit should address them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I added an assertion in AbstractCoordinator constructor. It seemed to be a better fit there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason other than I wanted to stick to the actual group id that was used before. I updated these to use the provided groupId.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did that in the next commit.

@hachikuji
Copy link
Copy Markdown
Contributor

@vahidhashemian Can you look into the build failures? I looked at one of the streams tests (KTableSourceTopicRestartIntegrationTest) and saw there was some dependence on the empty group id.

Exception in thread "ktable-restore-from-source-2639ce74-b612-423d-aaff-96cc59653aa7-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [ktable-restore-from-source-2639ce74-b612-423d-aaff-96cc59653aa7-StreamThread-1] Failed to rebalance.
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:922)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

cc @guozhangwang

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

Thanks @hachikuji for catching this.
I see this code in StreamsConfig.getRestoreConsumerConfigs:

        // no need to set group id for a restore consumer
        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);

which now means setting the group id to null. I think it should be safe to replace it with

        // no need to use a specific group id for a restore consumer
        baseConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "");

Thoughts?
@guozhangwang

@hachikuji
Copy link
Copy Markdown
Contributor

hachikuji commented Nov 14, 2018

The exception is being raised from a call to unsubscribe. We currently allow this API to be used in order to remove all assigned partitions, so I think we should probably remove the group id validation from that API.

@vahidhashemian
Copy link
Copy Markdown
Contributor Author

Sounds good, and seems to have no side effect with respect to existing unit tests. My initial thought was to use the empty string to keep the tests run with the same config as before. I'll submit a patch.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Looks like the build is passing. Just two more comments and I think we can merge.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we handle the else case? I was thinking that if the user enables auto commit explicitly, but provides no group id, then we should raise a configuration exception.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had here was to let the consumer run and throw an exception when a commit is triggered (or even earlier when e.g. subscribe is called). But I think your suggestion makes more sense because it points directly to the root cause, i.e. the misconfiguration. Updated in the new patch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use Objects.requireNonNull instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change too.

…IP-289)

Improve the default group id behavior by
* changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
* deprecate the use of empty (`""`) consumer group on the client
Also modified the logic in deciding whether auto-commit should be enable.
The validation caused Streams tests in `KTableSourceTopicRestartIntegrationTest` to fail.
Copy link
Copy Markdown
Contributor Author

@vahidhashemian vahidhashemian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the feedback. Responded to your comment inline and submitted a new patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had here was to let the consumer run and throw an exception when a commit is triggered (or even earlier when e.g. subscribe is called). But I think your suggestion makes more sense because it points directly to the root cause, i.e. the misconfiguration. Updated in the new patch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change too.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Pushed a trivial fix for the null check in AbstractCoordinator.

@hachikuji hachikuji merged commit c3e7d62 into apache:trunk Nov 16, 2018
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…IP-289) (apache#5877)

Improve the default group id behavior by:
* changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
* deprecating the use of empty (`""`) consumer group on the client

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants