Skip to content

KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators#16183

Merged
jolshan merged 16 commits intoapache:trunkfrom
jolshan:kafka-16192-1
Jul 26, 2024
Merged

KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators#16183
jolshan merged 16 commits intoapache:trunkfrom
jolshan:kafka-16192-1

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Jun 3, 2024

This change includes adding transaction.version (part of KIP-1022)

New transaction version 1 is introduced to support writing flexible fields in transaction state log messages.

Transaction version 2 is created in anticipation for further KIP-890 changes.

Neither are made production ready. Tests for the new transaction version and new MV are created.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jolshan jolshan changed the title KAFKA-16192: Introduce usage of flexible records to coordinators KAFKA-16192: Introduce transaction.version and usage of flexible records to coordinators Jun 3, 2024
IBP_4_0_IVO(21, "4.0", "IV0", false),

// Introduce version 1 of the TransactionVersion feature (KIP-890).
IBP_4_0_IV1(22, "4.0", "IV1", false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate MV version? We could bind TV to IBP_4_0_IVO. It's still in the future, so this should be ok as far as the bootstrap is concerned.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need it if we plan to make them both production ready at the same time

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now the current plan is to release both in 4.0, we could reflect this in the code (if the plans change we could change the code accordingly).

Copy link
Copy Markdown
Member Author

@jolshan jolshan Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I misunderstood your question. I thought you were referring to the TVs being the same. I think we should keep separate from group version for now since I assume these will be released at different times. But don't feel super strongly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Comment thread server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java Outdated
Comment on lines +24 to +25
// Version 1 enables flexible transactional state records. (KIP-890)
TV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we need a version to enable flexible records. Couldn't we use the version supporting it when we enable version 2? I may have missed something.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a discussion I had with @cmccabe. I think the idea was that even if we want to disable transaction version, we may still want to keep our flexible records due to the compatibility reasons. I know Jeff worked on that KIP to ignore the fields, but this was an extra safeguard.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. As you said, this is not strictly necessary but it does not hurt.

Copy link
Copy Markdown
Contributor

@artemlivshits artemlivshits left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

IBP_4_0_IVO(21, "4.0", "IV0", false),

// Introduce version 1 of the TransactionVersion feature (KIP-890).
IBP_4_0_IV1(22, "4.0", "IV1", false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

// generate the message for this transaction metadata
val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(newMetadata)
val valueBytes = TransactionLog.valueToBytes(newMetadata, usesFlexibleRecords())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test to ensure that this new logic works as expected?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally there is no difference now, so a test didn't come to my mind easily. I guess the best thing is to just look at the version of the record generated. I can add that.

Comment on lines +24 to +25
// Version 1 enables flexible transactional state records. (KIP-890)
TV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. As you said, this is not strictly necessary but it does not hurt.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. We may want to merge it after #16347 though.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jun 19, 2024

Ack -- I suppose so. I was hoping it would be merged by today 😅

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jun 19, 2024

In the meantime I will fix the test failure. Whoops.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 10, 2024

Yeehaw time to rebase

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 10, 2024

I think I also need to fix the case where max version is 0 as well. 🤔

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the PR. Left a minor comment.

// Add ELR related supports (KIP-966).
IBP_3_9_IV1(22, "3.9", "IV1", true),

IBP_3_9_IV2(23, "3.9", "IV2", false),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment that this is for bootstrapping TV?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I also need to make a few other changes when I rebase, so be on the lookout for more soon :)

I planned to make the update where we don't send features as part of the registration/api versions if the version range is 0-0. I need to do this because Colin's fix only changes the min version. It is safe because a version range 0-0 is essentially the same as not supporting the feature. We will have it here until the MV is marked as production ready.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I planned to make the update where we don't send features as part of the registration/api versions if the version range is 0-0.

Is that for the old versions of the requests or both the old and the latest version?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just do it regardless of the version of the request. I was planning to do it in Quorum/BrokerFeatures files, which would just be the controller/broker's own view of supported features. Alternatively I can do it in the request.

cc: @cmccabe

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Colin already pushed this as part of the kraft.version PR. I will just resolve the conflicts and update the comment you mention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a new MV just for bootstrapping a new feature or just piggyback on an existing one (e.g., IBP_3_9_IV1)?

I planned to make the update where we don't send features as part of the registration/api versions if the version range is 0-0.

Colin's PR only skipped a finalized version 0 feature in ApiResponse. Do you plan to do that for supported features for registration/api? Range 0-0 can now happen since some of the v1 features are not stable yet.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm-- I thought Colin's PR would cover what I need to cover since I didn't think his MV was stable yet either

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the PR to include the change for broker registration

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. Left another comment.

// Add ELR related supports (KIP-966).
IBP_3_9_IV1(22, "3.9", "IV1", true),

IBP_3_9_IV2(23, "3.9", "IV2", false),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a new MV just for bootstrapping a new feature or just piggyback on an existing one (e.g., IBP_3_9_IV1)?

I planned to make the update where we don't send features as part of the registration/api versions if the version range is 0-0.

Colin's PR only skipped a finalized version 0 feature in ApiResponse. Do you plan to do that for supported features for registration/api? Range 0-0 can now happen since some of the v1 features are not stable yet.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 19, 2024

Will look at the apiversions test failures 👍

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. A few more comments. Also, there seem to be quite a few test failures (ApiVersionsRequestTest, FeaturesTest) related to this PR?

Comment thread core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala Outdated
_supportedFeatures.asScala.foreach {
case (name, range) => features.add(new BrokerRegistrationRequestData.Feature().
// Do not include features with the range 0-0.
case (name, range) if range.max() > 0 => features.add(new BrokerRegistrationRequestData.Feature().
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do the same for supported features in ApiVersionResponse in the following code in BrokerFeatures, right? Also, to avoid duplicating the code, perhaps it's better to do the filtering logic in BrokerFeatures?

  def defaultSupportedFeatures(unstableFeatureVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
    val features = new util.HashMap[String, SupportedVersionRange]()
      features.put(MetadataVersion.FEATURE_NAME,
        new SupportedVersionRange(
          MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
          if (unstableFeatureVersionsEnabled) {
            MetadataVersion.latestTesting.featureLevel
          } else {
            MetadataVersion.latestProduction.featureLevel
          }))
    PRODUCTION_FEATURES.forEach { feature => features.put(feature.featureName,
          new SupportedVersionRange(0,
            if (unstableFeatureVersionsEnabled) {
              feature.latestTesting
            } else {
              feature.latestProduction
            }))
    }
    Features.supportedFeatures(features)
  }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered putting it in BrokerFeatures/QuorumFeatures, but decided against it.
Are you suggesting that I remove how Colin wrote the code (in the ApiVersionsResponse itself).
I was trying to avoid removing his code, but if you think it is better that way I can do it. (I will also need to move the test I wrote elsewhere)

Copy link
Copy Markdown
Contributor

@junrao junrao Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing is that we need to exclude range 0-0 for supported features in ApiVersionResponse too. Colin's code in ApiVersionsResponse doesn't seem to do what you are doing here for BrokerRegistrationRequest by excluding range 0-0 for supported features. It only changes minVersion v0 to v1 for older requests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It is already excluded from Colin's commit. If we want to share the logic we can move it to Broker/QuorumFeatures, but I wasn't sure if there was a reason why Colin did it in the response before.

cc: @cmccabe

Copy link
Copy Markdown
Contributor

@cmccabe cmccabe Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my point of view:

  1. We should never report a 0-0 supported versions range in an RPC. There is no point! This applies to both ApiVersionsResponse and BrokerRegistrationRequest.

  2. Obviously a 1-0 range is even worse, being incorrect as well as useless.

  3. Having a HashMap or something that contains a 0-0 range is fine, and may simplify the code in some ways. For example if we have code that changes the maximum supported range based on a config, it would probably be annoying to have to mutate the HashMap or use a different hash map if that config was on or off.

I don't feel really strongly about the third point, but I suspect that the code will be messier if you try to remove features with supported version 0-0 from the internal data structures than if you don't.

Copy link
Copy Markdown
Member Author

@jolshan jolshan Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal was that in defaultSupportedFeatues, we iterate over the features and add them to a map. Instead of just adding them all, we check if the range is 0-0. If so, we don't add them. (No removal needed) Thus, we don't need specific logic in the apis to remove the feature later.

The only concern is if somehow the features map expected a feature to exist. Not sure I follow mutating if the config is on or off. I assume we can't change the config to enable unstable features dynamically. (If we did, we'd have to mutate the map anyway)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code I refer to is:

for (Features feature : Features.PRODUCTION_FEATURES) {

PRODUCTION_FEATURES.forEach { feature => features.put(feature.featureName,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed code with this example.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : If you think it's easier to just omit them from the map, that's fine.

I still think we should be checking that these things don't exist later on (and for example, throw an exception if they do) to avoid regressions if someone changes something (as has been happening a lot lately. :) )

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to try to make things safer. Can you point me to an example of where you would expect to check if things exist? Do you mean like when building the request? Or something else?

Comment thread server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java Outdated
@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 19, 2024

Given that Kraft version starts at 0, I guess we should have all versions start a zero for consistency. I also noticed one test asserts all features with version zero bootstrap with minimum kraft version.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. Just a few more minor comments.

Comment thread core/src/main/scala/kafka/server/BrokerFeatures.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/BrokerFeaturesTest.scala Outdated
Comment thread server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java Outdated
Comment thread server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. It seems there is compilation failure.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jul 23, 2024

@jolshan : Thanks for the updated PR. The following test failure seems related to the PR.

[2024-07-23T02:46:53.731Z] 
[2024-07-23T02:46:53.731Z] Gradle Test Run :metadata:test > Gradle Test Executor 88 > QuorumFeaturesTest > testDefaultFeatureMap() STARTED
[2024-07-23T02:46:53.731Z] org.apache.kafka.controller.QuorumFeaturesTest.testDefaultFeatureMap() failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16183/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumFeaturesTest.testDefaultFeatureMap().test.stdout
[2024-07-23T02:46:53.731Z] 
[2024-07-23T02:46:53.731Z] Gradle Test Run :metadata:test > Gradle Test Executor 88 > QuorumFeaturesTest > testDefaultFeatureMap() FAILED
[2024-07-23T02:46:53.731Z]     org.opentest4j.AssertionFailedError: expected: <{kraft.version=0, transaction.version=0, metadata.version=1-20}> but was: <{metadata.version=1-20}>
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
[2024-07-23T02:46:53.731Z]         at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1145)
[2024-07-23T02:46:53.731Z]         at org.apache.kafka.controller.QuorumFeaturesTest.testDefaultFeatureMap(QuorumFeaturesTest.java:69)

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for the updated PR. Added one more comment.

expectedFeatures.put(feature.featureName(), VersionRange.of(
feature.minimumProduction(),
maxVersion
));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code in the test is pretty much the same as the actual code. Does this and the next test add much value?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say maybe if someone changes in the file, we would catch that this is desired behavior and the test would fail?

I can add a comment if that helps.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan : Thanks for explanation. The PR LGTM assuming the tests pass.

@jolshan
Copy link
Copy Markdown
Member Author

jolshan commented Jul 24, 2024

Some of the gradle task executors failed so I will run again.

@jolshan jolshan merged commit a0f6e6f into apache:trunk Jul 26, 2024
abhi-ksolves pushed a commit to ksolves/kafka that referenced this pull request Jul 31, 2024
…rds to coordinators (apache#16183)

This change includes adding transaction.version (part of KIP-1022)

New transaction version 1 is introduced to support writing flexible fields in transaction state log messages.

Transaction version 2 is created in anticipation for further KIP-890 changes.

Neither are made production ready. Tests for the new transaction version and new MV are created.

Also include change to not report a feature as supported if the range is 0-0.

Reviewers: Jun Rao <junrao@apache.org>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants