Skip to content

KAFKA-17336 Add IT to make sure the production MV does not use unstable version of LIST_OFFSET #16893

Merged
chia7712 merged 6 commits intoapache:trunkfrom
m1a2st:KAFKA-17336
Aug 22, 2024
Merged

KAFKA-17336 Add IT to make sure the production MV does not use unstable version of LIST_OFFSET #16893
chia7712 merged 6 commits intoapache:trunkfrom
m1a2st:KAFKA-17336

Conversation

@m1a2st
Copy link
Copy Markdown
Collaborator

@m1a2st m1a2st commented Aug 15, 2024

Jira: https://issues.apache.org/jira/browse/KAFKA-17336

Due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Aug 15, 2024

If ListOffsetRequest latestVersionUnstable is true, the testResponseIncludesLeaderEpochWithUnstableAPIs should be fail, This is my local test result, and the error log is

org.apache.kafka.common.errors.InvalidRequestException: Received request api key LIST_OFFSETS with version 9 which is not enabled

CleanShot 2024-08-16 at 00 00 26@2x

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testResponseIncludesLeaderEpoch(quorum: String): Unit = {
def testResponseIncludesLeaderEpochWithUnstableAPIs(quorum: String): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comments for this test? this comment is a good reference (#16841 (comment))

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the PR. Left a couple of comments.


@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
enableUnstableVersions = !testInfo.getDisplayName.contains("testResponseIncludesLeaderEpochWithUnstableAPIs")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the test name contains "UnstableAPI", we set enableUnstableVersions to false. Setting it to true will be more intuitive.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It make sence when test istestResponseIncludesLeaderEpochWithStableAPIs set enableUnstableVersions to false is more better.

testResponseIncludesLeaderEpoch()
}

def testResponseIncludesLeaderEpoch(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this truly captures the problem. The test seems to verify that the broker can accept the latest version of ListOffset when enableUnstableVersions is set to false. This means that we could never introduce an unstable version of ListOffset in the future. When introducing a new version of ListOffset, it's ok to make it unstable, as long as the stable MV doesn't use it for inter broker communication.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have a IT used to make sure MV does not return "unstable" version for inter broker communication. for example:

    @ClusterTests({
        @ClusterTest(serverProperties = {@ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "false")}),
        @ClusterTest(serverProperties = {@ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true")})
    })
    public void test(ClusterInstance clusterInstance) {
        clusterInstance.brokers().values().forEach(b -> {
            MetadataVersion mv = b.metadataCache().metadataVersion();
            Assertions.assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false));
            Assertions.assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false));
            Assertions.assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false));
        });
    }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Good idea. We probably don't need an IT. We could just use the latest production MV like the following.

    public void test() {
        MetadataVersion mv = MetadataVersion.latestProduction()
        Assertions.assertTrue(mv.listOffsetRequestVersion() <= ApiKeys.LIST_OFFSETS.latestVersion(false));
        Assertions.assertTrue(mv.fetchRequestVersion() <= ApiKeys.FETCH.latestVersion(false));
        Assertions.assertTrue(mv.offsetForLeaderEpochRequestVersion() <= ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(false));
        });
    }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao you always have better solution :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactionMarkerChannelManager sets the version for WriteTxnMarkersRequest based on MV in the following code in TransactionMarkerChannelManager. It would be useful to move that logic to MetadataVersion and add a similar test on unstable version. This could be done in a separate PR if it requires more work.

  private val writeTxnMarkersRequestVersion: Short =
    if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
    else 0

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be done in a separate PR if it requires more work.

I feel following small changes can address it, so this PR should be able to include it. @m1a2st WDYT?

    public short writeTxnMarkersRequestVersion() {
        if (isAtLeast(IBP_2_8_IV0)) {
            return 1;
        } else {
            return 0;
        }
    }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will address these comments in this PR.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. Added a couple of comments.

val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)
val request = new WriteTxnMarkersRequest.Builder(
config.interBrokerProtocolVersion.writeTxnMarkersRequestVersion(), markersToSend
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we should get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically. @jolshan : What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Config.interbrokerProtocolVersion is not the reliable way to get MV.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.interBrokerProtocolVersion is used in code base. in zk mode, metadataCache.metadataVersion is static config. By contrast, kraft has dynamic MV. Hence, we should apply that change (config.interBrokerProtocolVersion -> metadataCache.metadataVersion) to code base. BTW, that will be a large PR I guess @junrao @jolshan WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a quick look, it doesn't look like it is used in too many places, but can definitely be done as a followup.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but can definitely be done as a followup.

@m1a2st WDYT? Sorry that I'm in traveling so can't check the code usages

Copy link
Copy Markdown
Collaborator Author

@m1a2st m1a2st Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for sharing. It seems we can address it in this PR as it is not used in too many places.

}

@Test
public void testLastMetadataProductionDontReturnUnstableVersion() {
Copy link
Copy Markdown
Contributor

@junrao junrao Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment like the following.

The broker picks the version for a few inter broker RPCs based on the metadata version, instead of the supported version from ApiResponse. We need to make sure that the latest production MV doesn't accidentally depend on an unstable request version.

testLastMetadataProductionDontReturnUnstableVersion => testProductionMetadataDontUseUnstableApiVersion ?

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Aug 17, 2024

Thanks for @junrao, @chia7712, @jolshan, I already addressed these comments on this PR.

@chia7712
Copy link
Copy Markdown
Member

@m1a2st could you please check the following usage?

  1. val messageFormatVersion = new MessageFormatVersion(versionString, interBrokerProtocolVersion.version)
  2. def ensureInterBrokerVersion(version: MetadataVersion): Unit = {
  3. if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
  4. if (joinGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
  5. } else if (offsetCommitRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {
  6. if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion.isLessThan(IBP_2_3_IV0)) {

I don't list other usages belonging to zk broker, but it would be great that @m1a2st you can do double-check :)

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Aug 20, 2024

Hello @chia7712, If modify KafkaApis, some tests in KafkaApisTest will be fail.

  • shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenlnterBrokerProtocolNotSupported
  • shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenlnterBrokerProtocolNotSupported
  • shouldThrowUnsupportedVersionExceptionOnHandleWritelxnMarkersRequestWhenlnterBrokerProtocolNotSupported
  • rejectSyncGroupRequestWhenStaticMembershipNotSupported
  • rejectHeartbeatRequestWhenStaticMembershipNotSupported
  • shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenlnterBrokerProtocolNotSupported
  • rejectJoinGroupRequestWhenStaticMembershipNotSupported
  • shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenlnterBrokerProtocolNotSupported
  • rejectOffsetCommitRequestWhenStaticMembershipNotSupported

Copy link
Copy Markdown
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for updated patch

def ensureInterBrokerVersion(version: MetadataVersion): Unit = {
if (config.interBrokerProtocolVersion.isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}")
if (replicaManager.metadataCache.metadataVersion().isLessThan(version))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why taking metadataCache from replicaManager? KafkaApis has metadataCache already

}
}

when(replicaManager.metadataCache)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All we need to do is to create MetadataCache according to MetadataVersion used by testing. for example:

  @Test
  def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = {
    metadataCache = MetadataCache.zkMetadataCache(brokerId, IBP_0_10_2_IV0)
    brokerEpochManager = new ZkBrokerEpochManager(metadataCache, controller, None)
    kafkaApis = createKafkaApis(IBP_0_10_2_IV0)
    assertThrows(classOf[UnsupportedVersionException],
      () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching))
  }

@m1a2st m1a2st force-pushed the KAFKA-17336 branch 3 times, most recently from 0af86ec to 9abcf35 Compare August 20, 2024 16:49
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. Just a minor comment.

if (config.interBrokerProtocolVersion.isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion} is less than the required version: ${version}")
if (metadataCache.metadataVersion().isLessThan(version))
throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${metadataCache.metadataVersion()} is less than the required version: ${version}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change inter.broker.protocol.version accordingly?

Copy link
Copy Markdown
Collaborator Author

@m1a2st m1a2st Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment, I will modify this. Should I use the metadata.version is clear enough?

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st : Thanks for the updated PR. The code LGTM. Are the 14 test failures related to this PR?

@chia7712
Copy link
Copy Markdown
Member

Are the 14 test failures related to this PR?

tests with JDK 11 are not completed. @m1a2st Could you please rebase code to run CI again? the other failed tests pass on my local.

./gradlew cleanTest :streams:test --tests QueryableStateIntegrationTest.shouldBeAbleQueryStandbyStateDuringRebalance :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful --tests TopicCommandIntegrationTest.testDescribeWithDescribeTopicPartitionsApi :storage:test --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout :connect:mirror:test --tests DedicatedMirrorIntegrationTest.testMultiNodeCluster :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaSubscribe --tests TransactionsBounceTest.testWithGroupId --tests ShareGroupHeartbeatRequestTest.testPartitionAssignmentWithChangingTopics --tests DynamicBrokerReconfigurationTest.testTrustStoreAlter --tests TransactionsTest.testSendOffsetsToTransactionTimeout

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Aug 21, 2024

the other failed tests pass on my local.

oh, I run the tests with incorrect PR.

ShareGroupHeartbeatRequestTest is failed due to this PR :)

@m1a2st
Copy link
Copy Markdown
Collaborator Author

m1a2st commented Aug 22, 2024

These tests passed on my local exclude ShareGroupHeartbeatRequestTest.testPartitionAssignmentWithChangingTopics, It's a flaky test.
I pass this test about 150 times on this PR and trunk.

@chia7712
Copy link
Copy Markdown
Member

These tests passed on my local exclude ShareGroupHeartbeatRequestTest.testPartitionAssignmentWithChangingTopics, It's a flaky test.
I pass this test about 150 times on this PR and trunk.

I will take a look later.

@chia7712
Copy link
Copy Markdown
Member

ShareGroupHeartbeatRequestTest.testPartitionAssignmentWithChangingTopics is flaky on trunk too. I have filed jira: https://issues.apache.org/jira/browse/KAFKA-17359

@chia7712
Copy link
Copy Markdown
Member

The other failed tests pass on my local.

./gradlew cleanTest :streams:test --tests StreamsAssignmentScaleTest.testStickyTaskAssignorLargePartitionCount :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful :streams:streams-scala:test --tests StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegionWithNamedRepartitionTopic :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers :core:test --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeViaAssign --tests SslConsumerTest.testCoordinatorFailover --tests ZkMigrationIntegrationTest.testMigrateTopicDeletions

https://issues.apache.org/jira/browse/KAFKA-16636
https://issues.apache.org/jira/browse/KAFKA-16174
https://issues.apache.org/jira/browse/KAFKA-17408
https://issues.apache.org/jira/browse/KAFKA-15146
https://issues.apache.org/jira/browse/KAFKA-16634
https://issues.apache.org/jira/browse/KAFKA-8250
https://issues.apache.org/jira/browse/KAFKA-15146
https://issues.apache.org/jira/browse/KAFKA-16024
https://issues.apache.org/jira/browse/KAFKA-16045

@chia7712 chia7712 merged commit a3aa637 into apache:trunk Aug 22, 2024
chia7712 pushed a commit that referenced this pull request Aug 22, 2024
…le version of LIST_OFFSET (#16893)

- due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR
- get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
TaiJuWu added a commit to TaiJuWu/kafka that referenced this pull request Aug 25, 2024
…e unstable version of LIST_OFFSET (apache#16893)"

This reverts commit a3aa637.
LoganZhuZzz pushed a commit to LoganZhuZzz/kafka that referenced this pull request Aug 28, 2024
…le version of LIST_OFFSET (apache#16893)

- due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR
- get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
bboyleonp666 pushed a commit to bboyleonp666/kafka that referenced this pull request Sep 4, 2024
…le version of LIST_OFFSET (apache#16893)

- due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR
- get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically.

Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants