Skip to content

KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification#5810

Merged
cmccabe merged 7 commits intoapache:trunkfrom
stanislavkozlovski:trogdor-consumer-group-bench-spec
Oct 29, 2018
Merged

KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification#5810
cmccabe merged 7 commits intoapache:trunkfrom
stanislavkozlovski:trogdor-consumer-group-bench-spec

Conversation

@stanislavkozlovski
Copy link
Copy Markdown
Contributor

@stanislavkozlovski stanislavkozlovski commented Oct 17, 2018

https://issues.apache.org/jira/browse/KAFKA-7515

Changes

  • Add new consumerGroup field to ConsumeBenchSpec.
  • Changes the activeTopics field format in ConsumeBenchSpec.
    • activeTopics is a list of strings and now supports three notations for each value
      • 'foo' - denotes a topic name 'foo'
      • single-range notation 'foo[1-2] - gets expanded to two topics - 'foo1' and 'foo2'
      • double-range notation 'foo[1-2][1-2] - gets expanded to two topics with two partitions each. topic 'foo1' with partitions 1 and 2, topic 'foo2' with partitions 1 and 2

This ConsumeBenchWorker now supports three cases:

  1. When we want to manually assign partitions and use a random, new consumer group. (activeTopics contains at least one value with the new double-range notation (e.g foo[1-2][1-2]), consumerGroup is undefined) - consumer uses the specific partitions (and all partitions for topics who did not have in a second range) via KafkaConsumer#assign()
  2. When we want to have dynamic partition assignment via an existing consumer group's (activeTopics does not contain any double range notations, consumerGroup is specified) - KafkaConsumer#subscribe()
  3. When we want to manually assign partitions but track offsets via an existing consumer group (activeTopics contains at least one value with the new double-range notation, consumerGroup is specified) - KafkaConsumer#assign()

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

cc @cmccabe @apovzner

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 17, 2018

Thanks, @stanislavkozlovski

It seems like the only difference between the version with a group and the version without is that we call subscribe. So we don’t need a new class, right, just a new configuration option for ConsumeBench, I think.

executor.submit(new ConsumeMessages(partitions));

AbstractConsumeMessages consumeMessagesTask;
if (spec.consumerGroup() == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use null entries in JSON, because it gets too confusing. You should check against empty string here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, I could just set it to an empty string when it's null in the spec - way better

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using empty string for null is more confusing no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a pattern for all of the Trogdor JSON code where we don't use null anywhere. The problem with null is it gets annoying to check each collection for empty vs. null, each string for empty vs. null, etc. etc.

null is also handled kind of inconsistently in Jackson. Sometimes Jackson will serialize a field that is null as "foo": null whereas sometimes it will just omit the field. (I think that "foo": null is actually not conforming JSON, by the way...) There are probably ways to configure all this, but null doesn't really provide any value 99% of the time, so it's simpler to just treat empty as null.

Copy link
Copy Markdown
Member

@ijuma ijuma Oct 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be consistent, I agree. I don't agree with the value part. It's like saying that we should use empty String to represent the absence of a value in Java. Something like Optional is better and maps to null in JSON. Anyway, a discussion for a different venue. :)


AbstractConsumeMessages consumeMessagesTask;
if (spec.consumerGroup() == null) {
spec.consumerGroup(DEFAULT_CONSUMER_GROUP);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a randomly generated (and hopefully unique!) consumer group here so that we don't conflict with other people running a test.

this.commonClientConf = configOrEmptyMap(commonClientConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy();
this.consumerGroup = consumerGroup;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be consumerGroup == null ? "" : consumerGroup to match the other entries. We don't use nulls in JSON

Properties consumerProperties;

ConsumeMessages(Collection<TopicPartition> topicPartitions) {
AbstractConsumeMessages(Map<String, List<TopicPartition>> topicPartitionsByTopic) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't really need inheritance here. Can just have an "if" statement that checks if we have a group or not

.flatMap(List::stream).collect(Collectors.toList());
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
consumerProperties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
consumer.assign(topicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't always use assign here. If the developer has not specified any partitions, we can use the partitions of the group itself.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 18, 2018

Thanks, @stanislavkozlovski.

So there are basically three cases here:

  1. Developer specifies some partitions, but no group ID.
  2. Developer specifies just a group ID
  3. developers specifies some partitions, and also a group ID.

In case 1, we want the group ID to be randomly assigned and not to conflict with other group IDs in the cluster. Otherwise we may impact concurrently running tests.

In case 2, we want the group ID to be set. The client will ask the group for a partition assignment, rather than creating one manually. This is the most common way to use groups.

In case 3, we will set the group ID, but use KafkaConsumer#assign() to manually specify our partition assignment. In this case, the only thing we're using the group for is identifying the partition offset data we save during periodic offset auto-commits.

We can assume that we're in case 1 when we have an empty group ID.

We can assume that we're in case 2 when we have a non-empty group ID, but not any partition data. After all, it doesn't make sense to consume nothing!

Otherwise we're in case 3.

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

Thanks for the review @cmccabe,

Case 1. Agree, it makes sense to improve the current behavior and assign a random group when one isn't specified

Case 2. This also makes sense to me - we should support a consumer to "bootstrap" itself onto an existing consumer group

Case 3. I see the use case of saving offsets in that way, but I'm concerned we don't have a way to create a new consumer group from this tool. How would we go on about creating a new consumer group that it subscribed to some topics?
We would have a way to attach a consumer to a group (Case 2) but seemingly lack a way to start said group - leaving us to rely on something outside Trogdor I think.

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 18, 2018

I'm concerned we don't have a way to create a new consumer group from this tool.

If you join a consumer group that doesn't already exist, then it is created.

How would we go on about creating a new consumer group that it subscribed to some topics?

That is a good point. Perhaps we should have some way of support a use-case 4: create a group and subscribe (not assign) some partitions.

One way of doing this would be adding a new configuration like ignoreGroupPartitionAssignment, which could default to false, Then if it were set to true, we'd use assign; false, subscribe. What do you think?

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Oct 19, 2018

If you join a consumer group that doesn't already exist, then it is created.

Exactly, but if you are creating that and haven't populated the activeTopics field nothing would happen, right?

I think supporting use-case 4 is very important and as you suggested, should be the default behavior. I'm thinking of adding a useGroupPartitionAssignment which defaults to true -> calling subscribe().

Now I think this could have some impact on existing users as they would change from using assign() on a couple of partitions to using subscribe() on the whole topics themselves. So we should make sure that's not the case

Here's how I envision the configs to work:
Case 1: activeTopics specified, consumerGroupId not -> use assign() with a random consumer group id. Here, useGroupPartitionAssignment is totally ignored. This retains the old behavior
Case 2: consumerGroupId specified, activeTopics not -> ask group for partition assignment. I think that for completeness here we should enforce useGroupPartitionAssignment to be true (which is the default). Maybe throw an error if it's false?
Case 3: consumerGroupId specified, activeTopics specified, useGroupPartitionAssignment is explicitly set to false -> use assign()
Case 4: consumerGroupId specified, activeTopics specified, useGroupPartitionAssignment is true -> use subscribe() with the given topics

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

After a bit of investigation, it seems like we cannot simply call subscribe() without any topics. This makes case 2 invalid, we always need to have some topics or partitions to subscribe/assign to. I think we should discard that case

This ConsumeBenchWorker now supports three cases:
1. When we want to manually assign partitions and use a random, new consumer group. (useGroupPartitionAssignment=false, consumerGroup is undefined)  - KafkaConsumer#assign()
2. When we want to have dynamic partition assignment via an existing consumer group's (useGroupPartitionAssignment=false, consumerGroup is specified) - KafkaConsumer#subscribe()
3. When we want to manually assign partitions but track offsets via an existing consumer group (useGroupPartitionAssignment=true, consumerGroup is specified) - KafkaConsumer#assign()

Adds one new field to the ConsumeBenchSpec - "useGRoupPartitionAssignment"
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

stanislavkozlovski commented Oct 21, 2018

@cmccabe I've updated the PR to support cases 1, 3 and 4. Let me know if I'm on the right track and I'll make sure to update the tests/docs as well

topics, consumerGroup);
consumer.subscribe(topics);
}
else {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, will change to be on the same line as closing bracket

private final Map<String, String> commonClientConf;
private final TopicsSpec activeTopics;
private final List<String> activeTopics;
private Map<String, List<TopicPartition>> materializedTopics;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

private final TopicsSpec activeTopics;
private final List<String> activeTopics;
private Map<String, List<TopicPartition>> materializedTopics;
private boolean useGroupPartitionAssignment;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

private final List<String> activeTopics;
private Map<String, List<TopicPartition>> materializedTopics;
private boolean useGroupPartitionAssignment;
private String consumerGroup;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

}

private String generateConsumerGroup() {
return "consumer-group-" + UUID.randomUUID().toString();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps "consume-bench-" + UUID... so that it's clear that Trogdor created it?

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 24, 2018

Thanks, @stanislavkozlovski , this looks good. I think we're getting close.

With regard to the topics / partitions specification. The current approach in the PR, if I understand correctly, would require me to specify foo[0][0-1] if I wanted partitions foo0:0, foo0:1. That seems awkward. In general I don't think that we should conflate globs with partition numbers. What I mean is that we should allow people to specify partitions by number if there are zero, one, or two globs in the string.

I think we can do this by treating : as a partition number specifier. So for example foo:0 is partition 0 of foo. foo:[0-1] is foo:0 and foo:1, foo[0-1]:[0-1] is foo0:0, foo0:1, foo1:0, foo1:1, etc.

Supporting multiple globs in strings should be pretty simple too-- we just keep calling expand on the same set of strings until the set of strings doesn't change.

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

Thanks for the review @cmccabe. I like this notation better.
One thing I didn't implement was support for multiple, non-consecutive partitions (e.g foo:1:3:5). That should be easy to add but I feel it might overcomplicate things and be unnecessary

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 26, 2018

+1. Thanks, @stanislavkozlovski

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

Java 11 build timed out

22:51:49 Build timed out (after 180 minutes). Marking the build as aborted.

The Java 8 build passed fine. Maybe we could merge this?

@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented Oct 29, 2018

Build timeout in jdk11 is unrelated. Will merge. Thanks, @stanislavkozlovski

@cmccabe cmccabe merged commit d28c534 into apache:trunk Oct 29, 2018
ijuma pushed a commit that referenced this pull request Dec 8, 2018
…_group_partitions_should_raise (#6015)

This is the error message we're after:

"You may not specify an explicit partition assignment when using multiple consumers in the same group."

We apparently changed it midway through #5810 and forgot to update the test.

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#5810)

This ConsumeBenchWorker now supports using consumer groups.  The groups may be either used to store offsets, or as subscriptions.
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…_group_partitions_should_raise (apache#6015)

This is the error message we're after:

"You may not specify an explicit partition assignment when using multiple consumers in the same group."

We apparently changed it midway through apache#5810 and forgot to update the test.

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants