Skip to content

KAFKA-10027: Implement read path for feature versioning system (KIP-584)#8680

Merged
junrao merged 20 commits intoapache:trunkfrom
kowshik:kip584_features_read_path
Jun 11, 2020
Merged

KAFKA-10027: Implement read path for feature versioning system (KIP-584)#8680
junrao merged 20 commits intoapache:trunkfrom
kowshik:kip584_features_read_path

Conversation

@kowshik
Copy link
Copy Markdown
Contributor

@kowshik kowshik commented May 17, 2020

TL;DR:
In this PR, I have implemented various classes and integration for the read path of the feature versioning system (KIP-584). The ultimate plan is that the cluster-wide finalized features information is going to be stored in ZK under the node /feature. The read path implemented in this PR is centered around reading this finalized features information from ZK, and, processing it inside the Broker.

Here is a summary of what's in this PR (a lot of it is new classes):

  • A facility is provided in the broker to declare it's supported features, and advertise it's supported features via it's own BrokerIdZNode under a features key.
  • A facility is provided in the broker to listen to and propagate cluster-wide finalized feature changes from ZK.
  • When new finalized features are read from ZK, feature incompatibilities are detected by comparing against the broker's own supported features.
  • ApiVersionsResponse is now served containing supported and finalized feature information (using the newly added tagged fields).

More details below.

New IBP config value:
The feature versioning system is implemented such that it is activated only if the IBP >= KAFKA_2_7_IV0 (newly defined value in this PR). That is because, we would like to keep the feature version system disabled (almost as if it is non-existent) until the user upgrades the cluster to or beyond IBP KAFKA_2_7_IV0.

New common libraries (Java):

A set of common libraries abstracting features have been introduced. Features are a map with key being String (feature name) and value being a VersionRangeType (the range of supported or finalized versions for the feature). These common libraries are defined as 4 new classes in: org.apache.kafka.common.feature.{Features, BaseVersionRange, SupportedVersionRange, FinalizedVersionRange}.

The reason why it is kept in this common package is that future client and tooling development could reuse some/all of these abstractions.

New broker libraries (Scala):

The above common libraries are used within the Broker code to implement the read path for feature information, as explained below.

  1. FinalizedFeatureCache: Implemented a cache of cluster-wide finalized features (see companion object: FinalizedFeatureCache). This cache stores the latest finalized features and epoch read from the /feature node in ZK. Currently the main reader of this cache is the read path that serves an ApiVersionsRequest returning the features information in the response. In the future, as we start using the versioning system more, the cache could be read by future read paths intending to learn the finalized feature information. Note: this is defined as a companion object (a singleton), because, this way it will be easy to share read access with any code path in the Broker.
  2. FinalizedFeatureChangeListener: Implemented a feature ZK node change listener, that, listens to changes in the /feature node in ZK, and, invalidates the cache defined in FinalizedFeatureCache (see above point). See class: FinalizedFeatureChangeListener. An instance of this class is maintained in KafkaServer, and initialized only if the IBP is set to KAFKA_2_7_IV0 or higher.
  3. SupportedFeatures: Implemented a facility to define supported features within the Broker (these are specific to a broker binary). See companion object: SupportedFeatures. Currently the list of supported features is empty, because, we have not identified features (yet). In the future this class can be populated as we begin to define supported features. Also, the public interface of this class provides a way to compare supported vs finalized features to detect incompatibilities. Note: this is a companion object (a singleton), because, it makes it easy to define supported features at a common point and also share read access with any code path in the Broker.

New Zookeeper node: /feature:
The finalized feature information is going to be stored in ZK under the node /feature.

  1. The class backing the new node is FeatureZNode. It provides required state and encode/decode APIs that abstract the structure of the new feature ZK node.
  2. Implemented new APIs in KafkaZkClient.scala to read/update/delete the /feature ZK node.

Broker advertisements:
A facility is provided in the broker to declare it's supported features (via the SupportedFeatures class), and advertise it's supported features via it's own BrokerIdZNode under a features key (only if IBP >=KAFKA_2_7_IV0). The encode/decode logic for the new features key is provided in this PR in the class BrokerIdZNode.

Updated ApiVersionsResponse:

Defined new tagged fields in existing ApiVersionsResponse schema. In the request serving path (in KafkaApis.scala), these fields are populated in the response with the latest supported features (from SupportedFeatures) and latest finalized features (from FinalizedFeatureCache).

Tests:
Added test suites for all of the new classes, and changes made in this PR.

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants