Skip to content

KAFKA-14576: Move ConsoleConsumer to tools#15274

Merged
mimaison merged 8 commits intoapache:trunkfrom
mimaison:KAFKA-14576
Feb 13, 2024
Merged

KAFKA-14576: Move ConsoleConsumer to tools#15274
mimaison merged 8 commits intoapache:trunkfrom
mimaison:KAFKA-14576

Conversation

@mimaison
Copy link
Copy Markdown
Member

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@OmniaGM OmniaGM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, overall looks good. I just left few comments

}
}

static void run(ConsoleConsumerOptions conf) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for consistency can we rename conf to optslike L.57?


Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();

public ConsumerWrapper(Optional<String> topic,
Copy link
Copy Markdown
Contributor

@OmniaGM OmniaGM Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to wrap these parameters in Optional? Also, topic, partitionId, offset, includedTopics values all come from ConsoleConsumerOptions so maybe we can just send ConsoleConsumerOptions here and get ripped of the extra if/else in line 72 and the optional wrappers

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to copy the previous logic to make it easier to review and avoid changing any behavior by mistake. I agree this could be refactored but I'd prefer doing that in a follow up PR.

} else if (!topic.isPresent() && !partitionId.isPresent() && !offset.isPresent() && includedTopics.isPresent()) {
consumer.subscribe(Pattern.compile(includedTopics.get()));
} else {
throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line feel more fit for ConsoleConsumerOptions.checkRequiredArgs if the combination is invalid we should exit earlier.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the logic this way to make it easier to review and compare against the previous Scala code. There are tons of options and even with the many tests we have and the lot of manual testing I did, I don't want to change the behavior of this tool.

What you propose sounds like a nice improvement but I'd prefer keeping this PR as a "translation" and do improvements in follow up PRs.

this.timeoutMs = timeoutMs;

if (topic.isPresent() && partitionId.isPresent() && offset.isPresent() && !includedTopics.isPresent()) {
seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
Copy link
Copy Markdown
Contributor

@OmniaGM OmniaGM Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we opt in for my suggestion above we can also refactor seek to decide what to do if offset is provided or not instead of the first 2 ifs we have here.

seek(topic.get(), partitionId.getAsInt(), offset.getAsLong());
} else if (topic.isPresent() && partitionId.isPresent() && !offset.isPresent() && !includedTopics.isPresent()) {
// default to latest if no offset is provided
seek(topic.get(), partitionId.getAsInt(), ListOffsetsRequest.LATEST_TIMESTAMP);
Copy link
Copy Markdown
Contributor

@OmniaGM OmniaGM Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use ConsoleConsumerOptions.parseOffset to either grab the given offset or the default instead of these 2 ifs?

}

private MessageFormatter buildFormatter() {
MessageFormatter formatter = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is =null necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's needed otherwise the compiler complains formatter may not have been initialized as it does not understand the catch clause causes the tool to exit.

private byte[] headersSeparator = utfBytes(",");
private byte[] nullLiteral = utfBytes("null");

private Optional<Deserializer<?>> keyDeserializer;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we specify the default value to be Optional.empty here

private byte[] nullLiteral = utfBytes("null");

private Optional<Deserializer<?>> keyDeserializer;
private Optional<Deserializer<?>> valueDeserializer;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same


private Optional<Deserializer<?>> keyDeserializer;
private Optional<Deserializer<?>> valueDeserializer;
private Optional<Deserializer<?>> headersDeserializer;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Copy Markdown
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mimaison
I have a couple of comments, they are minor, but I believe the ones in LoggingMessageFormatter should be addressed as the output would be different

Comment thread tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java Outdated
Comment thread tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java Outdated
Comment thread tools/src/main/java/org/apache/kafka/tools/consumer/LoggingMessageFormatter.java Outdated
Copy link
Copy Markdown
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the best of my "in-head Scala -> Java translator" knowledge, the code generated is equivalent to the original one.
LGTM, thanks @mimaison

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LATEST_3_7 does not exist yet, we need to wait for 3.7.0 to be out before adding it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand it right that you want to merge this only after 3.7.0 is released?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually since DEV_VERSION already points to 3.8, I should be able to introduce LATEST_3_7 even if it's not released yet. I'll try that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to work. However while running the system tests I found an issue loading configs via --consumer.config. I pushed c6355fe to fix it.

I've kicked another run on our system tests CI, hopefully it will be clean now.

if node.version > LATEST_3_7:
cmd += " --formatter org.apache.kafka.tools.consumer.LoggingMessageFormatter"
else:
cmd += " --formatter kafka.tools.LoggingMessageFormatter"
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoggingMessageFormatter, DefaultMessageFormatter, NoOpMessageFormatter are not part of the public API and as far as I can tell the class names are not visible anywhere so this shouldn't be considered an API change.

@mimaison
Copy link
Copy Markdown
Member Author

I rebased on trunk to resolve conflicts. It seems my last fix has resolved the issues with the system tests so it is ready to review again.

Copy link
Copy Markdown
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mimaison

Comment on lines +240 to +241
Properties consumerProps = new Properties();
consumerProps.putAll(consumerPropsFromFile);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange that this change is not doing the exact same thing. But reading the API, constructor with Property says that it creates an empty list with the passed properties as defaults. Tricky API design from Java's part...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Default values are only retrieved when using getProperty() and not when using get(). This is why the unit test was passing.

@mimaison
Copy link
Copy Markdown
Member Author

@OmniaGM do you want to take another look?

Copy link
Copy Markdown
Contributor

@OmniaGM OmniaGM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @mimaison. We properly just need a jira for some of the improvements and refactoring here to get ripe of some of the inherited stuff from translating scala into java so we don't forget about them.

@mimaison
Copy link
Copy Markdown
Member Author

Thanks @OmniaGM !
I created https://issues.apache.org/jira/browse/KAFKA-16246 to track the follow up refactoring work.

@mimaison
Copy link
Copy Markdown
Member Author

None of the test failures seem related, merging to trunk

@mimaison mimaison merged commit 0bf830f into apache:trunk Feb 13, 2024
@mimaison mimaison deleted the KAFKA-14576 branch February 13, 2024 18:24
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
Reviewers: Josep Prat <josep.prat@aiven.io>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
Reviewers: Josep Prat <josep.prat@aiven.io>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
Reviewers: Josep Prat <josep.prat@aiven.io>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants