Skip to content

KAFKA-17011: Fix a bug preventing features from supporting v0#16421

Merged
jolshan merged 14 commits intoapache:trunkfrom
cmccabe:KAFKA-17011
Jul 10, 2024
Merged

KAFKA-17011: Fix a bug preventing features from supporting v0#16421
jolshan merged 14 commits intoapache:trunkfrom
cmccabe:KAFKA-17011

Conversation

@cmccabe
Copy link
Copy Markdown
Contributor

@cmccabe cmccabe commented Jun 21, 2024

As part of KIP-584, brokers expose a range of supported versions for each feature. For example, metadata.version might be supported from 1 to 21. (Note that feature level ranges are always inclusive, so this would include both level 1 and 21.)

These supported ranges are supposed to be able to include 0. For example, it should be possible for a broker to support a kraft.version between 0 and 1. However, in older software versions, there is an assertion in org.apache.kafka.common.feature.SupportedVersionRange that prevents this. This causes problems when the older software attempts to deserialize an ApiVersionsResponse containing such a range.

In order to resolve this dilemma, this PR bumps the version of ApiVersionsRequest from 3 to 4. Clients which send v4 promise to be able to handle ranges including 0. Clients which send v3 will not be exposed to these ranges. The feature will show up as having a minimum version of 1 instead. This work is part
of KIP-1022.

Similarly, this PR also introduces a new version of BrokerRegistrationRequest, and specifies that the
older versions of that RPC cannot handle supported version ranges including 0. Therefore, 0 is translated to 1 in the older requests.

As part of KIP-584, brokers expose a range of supported versions for each feature. For example,
metadata.version might be supported from 1 to 21. (Note that feature level ranges are always
inclusive, so this would include both level 1 and 21.)

These supported ranges are supposed to be able to include 0. For example, it should be possible for
a broker to support a kraft.version between 0 and 1. However, in older software versions, there is
an assertion in org.apache.kafka.common.feature.SupportedVersionRange that prevents this. This
causes problems when the older software attempts to deserialize an ApiVersionsResponse containing
such a range.

In order to resolve this dilemma, this PR bumps the version of ApiVersionsRequest from 3 to 4.
Clients which send v4 promise to be able to handle ranges including 0. Clients which send v3 will
not be exposed to these ranges -- the feature will simply be omitted from the response. This work
is part of KIP-1022.
Comment thread core/src/main/scala/kafka/network/SocketServer.scala Outdated
@jolshan
Copy link
Copy Markdown
Member

jolshan commented Jun 24, 2024

Did we check the kraft upgrade system tests? I can kick off a job. Looks like there may also be some issue with the build.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the PR. Left a minor comment.

Comment thread core/src/main/scala/kafka/network/SocketServer.scala Outdated
@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Jun 25, 2024

It seems simpler to just pass the ApiVersionsResponse version as a parameter to the Supplier (hence making it a Function, I guess)

Updated.

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Jun 25, 2024

Let's take a look at testUnsupportedApiVersionsRequestWithVersionProvidedByTheBroker

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Jun 25, 2024

It looks like the test was checking the ApiVersionsRequest header version against 3. So it needed to be updated to 4

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Jun 26, 2024

The test failures pass locally.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. Just a few minor comments.

Comment thread core/src/test/scala/unit/kafka/server/ApiVersionsResponseIntegrationTest.scala Outdated
Comment thread core/src/test/scala/unit/kafka/server/ApiVersionsResponseIntegrationTest.scala Outdated
Comment thread core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala Outdated
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. A couple of comments.

Also, I asked a followup question in the mailing list on whether it's better to suppress V0 features completely or just exclude V0 from such features. It would be useful to reach consensus on that.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. The code LGTM. There was still some discussion in the mailing list for KIP-1022 on the best approach for addressing this issue. We probably need to reach some consensus there first.

Also, are the following 2 test failures related?
Build / JDK 8 and Scala 2.12 / kafka.admin.ListOffsetsIntegrationTest."testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer(String).quorum=kraft"
Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.GetOffsetShellTest.testTopicPartitionsArgWithInternalIncluded [2] Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Jul 3, 2024

#16183 will wait for this PR in order to not break tests.

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe thanks for this nice fix. some comments are left. PTAL

{ "name": "SupportedFeatures", "type": "[]SupportedFeatureKey", "ignorable": true,
"versions": "3+", "tag": 0, "taggedVersions": "3+",
"about": "Features supported by the broker.",
"about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MinSupportedVersion -> MinVersion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.

This needs to be adjusted accordingly since we now keep the feature but returns 1 as the minSupportedVersion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be adjusted accordingly since we now keep the feature but returns 1 as the minSupportedVersion.

done

def testSendV4ApiVersionsRequest(quorum: String): Unit = {
val response = sendApiVersionsRequest(4)
if (quorum.equals("kraft")) {
assertFeatureHasMinVersion("group.version", response.data().supportedFeatures(), 0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group.version is already reverted by #16482

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return new BrokerRegistrationRequest(data, version);
if (version < 4) {
// Workaround for KAFKA-17011: for BrokerRegistrationRequest versions older than 4,
// exclude support version ranges that begin with 0.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to finalize/agree here about omission vs. using 1 for older versions of the api.

Right now we have omit for broker registration, but using version 1 for ApiVersions. We also have the option to use 1 for both or omit for both. Note: In 3.8, we omit for both requests.

I think there is some benefit in not including a version with 0 to older controllers rather than 1, but can also understand that it is confusing to have different approaches for different APIs.

@junrao I know you had some opinions here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it's simpler for the old BrokerRegistrationRequest to be consistent with the older version of ApiVersionResponse, i.e., to include the feature but set the minSupportedVersion to 1, instead of 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Just like in ApiVersionsResponse, we will translate min version = 0 to min version = 1 here.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. A few more comments.

{ "name": "SupportedFeatures", "type": "[]SupportedFeatureKey", "ignorable": true,
"versions": "3+", "tag": 0, "taggedVersions": "3+",
"about": "Features supported by the broker.",
"about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.

This needs to be adjusted accordingly since we now keep the feature but returns 1 as the minSupportedVersion.

},
{ "name": "Features", "type": "[]Feature",
"about": "The features on this broker", "versions": "0+", "fields": [
"about": "The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.", "versions": "0+", "fields": [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: in v0-v3, features with MinSupportedVersion = 0 must be left out.

This needs to be adjusted accordingly since we now keep the feature but returns 1 as the minSupportedVersion.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return new BrokerRegistrationRequest(data, version);
if (version < 4) {
// Workaround for KAFKA-17011: for BrokerRegistrationRequest versions older than 4,
// exclude support version ranges that begin with 0.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it's simpler for the old BrokerRegistrationRequest to be consistent with the older version of ApiVersionResponse, i.e., to include the feature but set the minSupportedVersion to 1, instead of 0.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. Left a few more comments.

}))
}
if (unstableFeatureVersionsEnabled) {
features.put("kraft.version", new SupportedVersionRange(1))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add kraft.version to PRODUCTION_FEATURES, which is supposed to be the place that tracks all features?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that in a follow-on PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. This has one argument? And is kraft.version part of this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here. This has one argument?

There's a constructor that creates a range 0-N based on just being given N. I will change it to be the two-argument constructor since I guess that's confusing.

And is kraft.version part of this PR?

I want to have it here for test purposes. I will create it for real in a follow-on PR, if this PR ever concludes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was confused because I didn't see it in the file.

SupportedVersionRange(final short minVersion, final short maxVersion) {
but 🤷‍♀️

private Map<String, Short> finalizedFeatures = null;
private long finalizedFeaturesEpoch = 0;
private boolean zkMigrationEnabled = false;
private boolean suppressFeatureLevel0 = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressFeatureLevel0 => alterFeatureLevel0 ?

return this;
}

public Builder setSuppressFeatureLevel0(boolean suppressFeatureLevel0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressFeatureLevel0 => alterFeatureLevel0 ?

final boolean zkMigrationEnabled
private static SupportedFeatureKeyCollection maybeFilterSupportedFeatureKeys(
Features<SupportedVersionRange> latestSupportedFeatures,
boolean suppressV0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressV0 => alterV0 ?


@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSuppressV0Features(boolean suppressV0Features) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testSuppressV0Features => testAlterV0Features ?
suppressV0Features => alterV0Features ?

def enabledApis: collection.Set[ApiKeys]

def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
def apiVersionResponse(throttleTimeMs: Int, suppressFeatureLevel0: Boolean): ApiVersionsResponse
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressFeatureLevel0 => alterFeatureLevel0 ?

override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
override def apiVersionResponse(
throttleTimeMs: Int,
suppressFeatureLevel0: Boolean
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressFeatureLevel0 => alterFeatureLevel0 ?

val supportedFeatures = brokerFeatures.supportedFeatures
override def apiVersionResponse(
throttleTimeMs: Int,
suppressFeatureLevel0: Boolean
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suppressFeatureLevel0 => alterFeatureLevel0 ?

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Jul 10, 2024

Looks like there are still quite a few failures for ApiVersionsRequestTest and BrokerFeaturesTest

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. Added one question below. Also, there were a bunch of ApiVersionsRequestTest failures.


// Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version)
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(0)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this test work? Do we explicitly set the FinalizedVersionLevel for kraft.version to 0 somewhere?

Copy link
Copy Markdown
Contributor Author

@cmccabe cmccabe Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we explicitly set the FinalizedVersionLevel for kraft.version to 0 somewhere?

anything not enabled is at level 0

@cmccabe
Copy link
Copy Markdown
Contributor Author

cmccabe commented Jul 10, 2024

The context here is that we’re adding kraft.version in 3.9. But not in this PR. I did add kraft.version to supportedFeatures, simply so that I could verify that the RPC results looked like what I wanted. This will cause no compatibility problems since there is no way to set kraft.version yet.

Copy link
Copy Markdown
Member

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm as long as tests pass. (For the followup that adds kraft.version, let's try to follow the conventions for new features so we don't hardcode everything 👍 )

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe : Thanks for the updated PR. LGTM assuming the tests pass.

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Jul 10, 2024

Test failures are unrelated. I will merge.

@jolshan jolshan merged commit ede289d into apache:trunk Jul 10, 2024
abhi-ksolves pushed a commit to ksolves/kafka that referenced this pull request Jul 31, 2024
…#16421)

As part of KIP-584, brokers expose a range of supported versions for each feature. For example, metadata.version might be supported from 1 to 21. (Note that feature level ranges are always inclusive, so this would include both level 1 and 21.)

These supported ranges are supposed to be able to include 0. For example, it should be possible for a broker to support a kraft.version between 0 and 1. However, in older software versions, there is an assertion in org.apache.kafka.common.feature.SupportedVersionRange that prevents this. This causes problems when the older software attempts to deserialize an ApiVersionsResponse containing such a range.

In order to resolve this dilemma, this PR bumps the version of ApiVersionsRequest from 3 to 4. Clients which send v4 promise to be able to handle ranges including 0. Clients which send v3 will not be exposed to these ranges. The feature will show up as having a minimum version of 1 instead. This work is part
of KIP-1022.

Similarly, this PR also introduces a new version of BrokerRegistrationRequest, and specifies that the
older versions of that RPC cannot handle supported version ranges including 0. Therefore, 0 is translated to 1 in the older requests.

Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
iter.hasNext(); ) {
BrokerRegistrationRequestData.Feature feature = iter.next();
if (feature.minSupportedVersion() == 0) {
feature.setMinSupportedVersion((short) 1);
Copy link
Copy Markdown
Member

@chia7712 chia7712 Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another issue caused by changing the version from 0 to 1. #15685 add the check which assumes the "default version" is 0

https://github.com/apache/kafka/blob/3.8/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java#L470

Hence, the features having min=1 can NOT pass the check. see following error message:

[2024-09-05 11:22:22,146] INFO [QuorumController id=2] registerBroker: event failed with UnsupportedVersionException in 127 microseconds. Exception message: Unable to register because the broker does not support version 0 of group.version. It wants a version between 1 and 1, inclusive. (org.apache.kafka.controller.QuorumController)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 Thanks. Is there a way to reproduce this (e.g. an existing test)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to reproduce this (e.g. an existing test)?

run the following command with #17084

TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade.test_combined_mode_upgrade" \
/bin/bash tests/docker/run_tests.sh

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I thought that we could not set version 0 with the storage or update tools since it would treat it as disabling the feature. But @dajac pointed out to me that this is happening by setting the MV and that picks default features.

However, the min version 1 should only be the case for older request versions. I thought we fixed this to so we wouldn't have the requirement for version 4 and for 4.0 where group version is introduced.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I last talked to Colin about this, I believe he said we could not disable the feature if the min version is > 0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I last talked to Colin about this, I believe he said we could not disable the feature if the min version is > 0.

@cmccabe : Could you confirm if this is case? How would people upgrade from an old release where a feature is not turned on?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suspect you have to upgrade the feature before the upgrade to the new release, but we can let Colin confirm.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with Colin offline. To summarize, if we do increase minVersion in the future, we need to bump up the default version for that feature. An old release may need to first upgrade to a bridge release where the new default version could be set. Given that, we don't need to change the logic in ClusterControlManager.

Copy link
Copy Markdown
Member

@chia7712 chia7712 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that, we don't need to change the logic in ClusterControlManager.

I will sync it to KAFKA-17429 and #17128

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants