Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.

Conversation

@merrimanr
Copy link
Contributor

Contributor Comments

This PR upgrades Kafka 2.0.0 and Storm to 1.2.0.

Changes Included

  • Updated Kafka and Storm versions in Maven
  • Updated Maven pom files to include the correct dependency versions
  • Updated Maven profile in Ansible to HDP-3.1.0.0
  • Changed REST classes to use new Kafka client classes
  • Substituted Kafka constants for hardcoded strings
  • Upgraded to the KafkaComponent class to use new Kafka classes

Testing

I have done some light testing in full dev including:

  • Verified all topologies are running with no errors and data is flowing through
  • Verified the Kafka REST endpoints functions as expected
  • Verified the Kafka Stellar functions continue to work

Future Work

I believe there are parts of our codebase that we can deprecate in separate PRs:

I also ran into several classpath version errors. It is not possible to resolve these using the standard Maven exclude tags due to the issue described here. As a workaround, I move the conflicting dependency to the top of the pom so that it takes priority over the shaded dependencies. This is not ideal but easier than trying to track the version down in the shaded dependencies.

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:

    mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    
  • Have you ensured that any documentation diagrams have been updated, along with their source files, using draw.io? See Metron Development Guidelines for instructions.

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

Copy link
Contributor

@mmiklavc mmiklavc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merrimanr Thanks for the contribution. Can we see what, if any, of the Kafka changes are backwards compatible with master? This is a minor version change - 2.10 to 2.12, so the API should be compatible. This upgrade task has become pretty meaty, and I think we want to get as much of the low-hanging fruit stable against master as possible before finishing out the bigger tasks in the feature branch.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka-clients</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a change from kafka_2.10 to kafka-clients in a number of places, and in others (see below) there is kafka_2.12. Can you explain this further? Can they all be normalized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kafka_2.12 dependency is needed for the Kafka testing infrastructure. That is why it's only used here in metron-integration-test.

@Override
public void stop() {
shutdownConsumer();
shutdownProducers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to shut down the consumers here any longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why I originally did this. I will add it back.

zkClient.deleteRecursive(ZkUtils.PreferredReplicaLeaderElectionPath());
zkClient.deleteRecursive(ZkUtils.BrokerSequenceIdPath());
zkClient.deleteRecursive(ZkUtils.IsrChangeNotificationPath());
zkClient.deleteRecursive(ZkUtils.EntityConfigPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this removed for?

Copy link
Contributor Author

@merrimanr merrimanr Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method no longer exists in the newer version. I don't know what it's for but tests still pass so I assume it isn't needed.

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
messages.add(bytes);
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we basing this off of a previous default from Kafka? I can't tell what that might have been from the removed code. If 1 second is indeed reasonable, can this value be made a constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make this a constant.

</exclusion>
</exclusions>
</dependency>
<!--<dependency>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

</exclusions>
</dependency>
<dependency>
<!--dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

<commons.config.version>1.10</commons.config.version>
</properties>
<dependencies>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the reason for the deps moving around? I'm assuming this is to deal with some classpath issues, and it's also why this begs the question - anything we do that changes around classpath ordering comes with it risks, and so it should be minimized unless absolutely necessary.

Copy link
Contributor Author

@merrimanr merrimanr Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, these changes were to fix classpath issues. Newer dependency versions introduce new transitive dependency versions. Any classpath ordering changes in this PR were necessary.

</exclusion>
</exclusions>
</dependency>
<!--<dependency>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code

assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
assertEquals(producerConfigs.get("request.required.acks"), 1);
assertEquals(producerConfigs.get("acks"), "1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these properties exchangeable?

  1. request.required.acks
  2. ProducerConfig.ACKS_CONFIG (which is "acks")

I don't see request.required.acks in the current producer config class, but I do see acks, which means we may be losing a config option here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this property changed from request.required.acks to acks from version 0.9.x to 0.10.x:

0.9.x - https://kafka.apache.org/090/documentation.html#design_ha
2.0.x - https://kafka.apache.org/20/documentation.html#design_ha

</activation>
<properties>
<hdp_version>3.1.0.0</hdp_version>
<global_storm_version>1.2.1</global_storm_version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate Storm and Kafka? These are not closely related the way ES and Kibana are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I don't believe any changes were needed for Storm.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants