From cea8fe26dad1e5e7fc648dbc6d0b6a8fbd6091a5 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 10 Dec 2024 15:15:37 -0800 Subject: [PATCH 1/6] Add tests and mark tv2 as production ready --- .../internals/TransactionManager.java | 1 + .../server/common/TransactionVersion.java | 2 +- tests/kafkatest/services/kafka/kafka.py | 22 ++++++++++++++++--- .../kafkatest/tests/core/transactions_test.py | 16 ++++++++------ .../tests/core/transactions_upgrade_test.py | 17 +++++++++----- tests/kafkatest/version.py | 4 ++++ 6 files changed, 46 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 13935d483b5be..c6ab5cb5c811c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -446,6 +446,7 @@ public synchronized void maybeUpdateTransactionV2Enabled() { latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; + log.debug("Updating isTV2 enabled to {} at with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch); } public boolean isTransactionV2Enabled() { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java index 45546c447b036..fc85f55606f71 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java @@ -33,7 +33,7 @@ public enum TransactionVersion implements FeatureVersion { public static final String FEATURE_NAME = "transaction.version"; - public static final TransactionVersion LATEST_PRODUCTION = TV_0; + public static final TransactionVersion LATEST_PRODUCTION = TV_2; private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index acfa5c7f6c2d9..ff90d0741311b 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -33,6 +33,7 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH from kafkatest.version import KafkaVersion +from kafkatest.version import get_version from kafkatest.services.kafka.util import fix_opts_for_new_jvm @@ -206,6 +207,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI use_new_coordinator=None, consumer_group_migration_policy=None, dynamicRaftQuorum=False, + use_transactions_v2=False ): """ :param context: test context @@ -269,6 +271,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI :param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used. :param consumer_group_migration_policy: The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa. :param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag + :param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890 """ self.zk = zk @@ -293,6 +296,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI # Assign the determined value. self.use_new_coordinator = use_new_coordinator + self.use_transactions_v2 = use_transactions_v2 # Set consumer_group_migration_policy based on context and arguments. if consumer_group_migration_policy is None: @@ -887,6 +891,11 @@ def start_node(self, node, timeout_sec=60, **kwargs): else: cmd += " --standalone" self.standalone_controller_bootstrapped = True + if self.use_transactions_v2: + cmd += " --feature transaction.version=2" + else + if get_version(node).supports_feature_command(): + cmd += " --feature transaction.version=1" self.logger.info("Running log directory format command...\n%s" % cmd) node.account.ssh(cmd) @@ -920,18 +929,25 @@ def wait_for_start(self, node, monitor, timeout_sec=60): raise Exception("No process ids recorded on node %s" % node.account.hostname) def upgrade_metadata_version(self, new_version): - self.run_features_command("upgrade", new_version) + self.run_metadata_features_command("upgrade", new_version) def downgrade_metadata_version(self, new_version): - self.run_features_command("downgrade", new_version) + self.run_metadata_features_command("downgrade", new_version) - def run_features_command(self, op, new_version): + def run_metadata_features_command(self, op, new_version): cmd = self.path.script("kafka-features.sh ") cmd += "--bootstrap-server %s " % self.bootstrap_servers() cmd += "%s --metadata %s" % (op, new_version) self.logger.info("Running %s command...\n%s" % (op, cmd)) self.nodes[0].account.ssh(cmd) + def run_features_command(self, op, feature, new_version): + cmd = self.path.script("kafka-features.sh ") + cmd += "--bootstrap-server %s " % self.bootstrap_servers() + cmd += "%s --feature %s=%s" % (op, new_version) + self.logger.info("Running %s command...\n%s" % (op, cmd)) + self.nodes[0].account.ssh(cmd) + def pids(self, node): """Return process ids associated with running processes on the given node.""" try: diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 5fbd987a97c66..c01c55aaeadbc 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -58,11 +58,6 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "transactions-test-consumer-group" - self.kafka = KafkaService(test_context, - num_nodes=self.num_brokers, - zk=None, - controller_num_nodes_override=1) - def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 seed_producer = VerifiableProducer(context=self.test_context, @@ -213,6 +208,7 @@ def setup_topics(self): use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, use_new_coordinator=[False] + use_transactions_v2=[True, False] ) @matrix( failure_mode=["hard_bounce", "clean_bounce"], @@ -221,9 +217,15 @@ def setup_topics(self): use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, use_new_coordinator=[True], - group_protocol=consumer_group.all_group_protocols + group_protocol=consumer_group.all_group_protocols, + use_transactions_v2=[True, False] ) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None): + def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None, use_transactions_v2=False): + self.kafka = KafkaService(self.test_context, + num_nodes=self.num_brokers, + zk=None, + controller_num_nodes_override=1, + uses_transactions_v2=uses_transactions_v2) security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py index 5c72a6fa806cf..105e55ab93a1d 100644 --- a/tests/kafkatest/tests/core/transactions_upgrade_test.py +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -21,7 +21,7 @@ from kafkatest.utils import is_int from kafkatest.utils.transactions_utils import create_and_start_copiers from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ - LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION + LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION, LATEST_STABLE_TRANSACTION_VERSION from ducktape.tests.test import Test from ducktape.mark import matrix @@ -48,9 +48,9 @@ def __init__(self, test_context): self.replication_factor = 3 # Test parameters - self.num_input_partitions = 3 - self.num_output_partitions = 3 - self.num_seed_messages = 1000 + self.num_input_partitions = 1 + self.num_output_partitions = 1 + self.num_seed_messages = 4000 self.transaction_size = 5 # The transaction timeout should be lower than the progress timeout, but at @@ -141,6 +141,13 @@ def perform_upgrade(self, from_kafka_version): self.logger.info("Successfully restarted broker node %s" % node.account.hostname) self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) + self.logger.info("Changing transaction.version to %s" % LATEST_STABLE_TRANSACTION_VERSION) + self.kafka.run_features_command("upgrade", "transaction.version", LATEST_STABLE_TRANSACTION_VERSION) + # Restart last broker to resend api versions + node = self.kafka.nodes[2] + self.kafka.stop_node(node) + self.kafka.start_node(node) + self.wait_until_rejoin() def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic, num_copiers, num_messages_to_copy, @@ -197,7 +204,7 @@ def setup_topics(self): } } - @cluster(num_nodes=10) + @cluster(num_nodes=8) @matrix( from_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)], metadata_quorum=[isolated_kraft], diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 9f0e491d2cea9..2c04f3511f557 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -97,6 +97,9 @@ def supports_fk_joins(self): # -> https://issues.apache.org/jira/browse/KAFKA-14646 return hasattr(self, "version") and self >= V_3_4_0 + def supports_feature_command(self): + return self >= V_3_8_0 + def get_version(node=None): """Return the version attached to the given node. Default to DEV_BRANCH if node or node.version is undefined (aka None) @@ -109,6 +112,7 @@ def get_version(node=None): DEV_BRANCH = KafkaVersion("dev") DEV_VERSION = KafkaVersion("4.0.0-SNAPSHOT") +LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java LATEST_STABLE_METADATA_VERSION = "4.0-IV0" From d15a964f1fe3bd914d00e9cbc4b7ff43ba5279d8 Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 10 Dec 2024 15:54:44 -0800 Subject: [PATCH 2/6] typos --- tests/kafkatest/services/kafka/kafka.py | 4 ++-- tests/kafkatest/tests/core/transactions_test.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ff90d0741311b..bb59430d9a7c8 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -893,7 +893,7 @@ def start_node(self, node, timeout_sec=60, **kwargs): self.standalone_controller_bootstrapped = True if self.use_transactions_v2: cmd += " --feature transaction.version=2" - else + else: if get_version(node).supports_feature_command(): cmd += " --feature transaction.version=1" self.logger.info("Running log directory format command...\n%s" % cmd) @@ -944,7 +944,7 @@ def run_metadata_features_command(self, op, new_version): def run_features_command(self, op, feature, new_version): cmd = self.path.script("kafka-features.sh ") cmd += "--bootstrap-server %s " % self.bootstrap_servers() - cmd += "%s --feature %s=%s" % (op, new_version) + cmd += "%s --feature %s=%s" % (op, feature, new_version) self.logger.info("Running %s command...\n%s" % (op, cmd)) self.nodes[0].account.ssh(cmd) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index c01c55aaeadbc..97f2a8400c4e9 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -207,7 +207,7 @@ def setup_topics(self): check_order=[True, False], use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, - use_new_coordinator=[False] + use_new_coordinator=[False], use_transactions_v2=[True, False] ) @matrix( From e69893c32ca1fb61097ed8f0ba7229286fdfdac8 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 11 Dec 2024 08:38:06 -0800 Subject: [PATCH 3/6] Other fixes --- tests/kafkatest/services/kafka/kafka.py | 2 +- tests/kafkatest/tests/core/transactions_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index bb59430d9a7c8..ea3c724e0377d 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -895,7 +895,7 @@ def start_node(self, node, timeout_sec=60, **kwargs): cmd += " --feature transaction.version=2" else: if get_version(node).supports_feature_command(): - cmd += " --feature transaction.version=1" + cmd += " --feature transaction.version=0" self.logger.info("Running log directory format command...\n%s" % cmd) node.account.ssh(cmd) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 97f2a8400c4e9..e329b3f7ac7b8 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -225,7 +225,7 @@ def test_transactions(self, failure_mode, bounce_target, check_order, use_group_ num_nodes=self.num_brokers, zk=None, controller_num_nodes_override=1, - uses_transactions_v2=uses_transactions_v2) + use_transactions_v2=use_transactions_v2) security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol From 978c49b83a6eaa653289b7a5e2f2077bd35d3625 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 11 Dec 2024 10:36:28 -0800 Subject: [PATCH 4/6] Fix default version --- .../main/java/org/apache/kafka/controller/QuorumFeatures.java | 2 +- .../src/main/java/org/apache/kafka/server/BrokerFeatures.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index 9b79b5760443f..e3b89d68845d3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -63,7 +63,7 @@ public static Map defaultFeatureMap(boolean enableUnstable MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); for (Feature feature : Feature.PRODUCTION_FEATURES) { - short maxVersion = enableUnstable ? feature.latestTesting() : feature.latestProduction(); + short maxVersion = enableUnstable ? feature.latestTesting() : feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); if (maxVersion > 0) { features.put(feature.featureName(), VersionRange.of(feature.minimumProduction(), maxVersion)); } diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java index 197ebea427a58..eda52b252f372 100644 --- a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java +++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java @@ -72,7 +72,7 @@ public static Features defaultSupportedFeatures(boolean u unstableFeatureVersionsEnabled ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); PRODUCTION_FEATURES.forEach(feature -> { - int maxVersion = unstableFeatureVersionsEnabled ? feature.latestTesting() : feature.latestProduction(); + int maxVersion = unstableFeatureVersionsEnabled ? feature.latestTesting() : feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); if (maxVersion > 0) { features.put(feature.featureName(), new SupportedVersionRange(feature.minimumProduction(), (short) maxVersion)); } From fdcfaa3f5437c264c9360927b6cd997b584a5533 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 11 Dec 2024 14:19:09 -0800 Subject: [PATCH 5/6] Fix naming,tests for defaultFeature map --- .../scala/kafka/server/ControllerServer.scala | 4 ++-- .../apache/kafka/controller/QuorumFeatures.java | 4 ++-- .../controller/ClusterControlManagerTest.java | 16 ++++++++-------- .../controller/FeatureControlManagerTest.java | 2 +- .../controller/ProducerIdControlManagerTest.java | 2 +- .../controller/QuorumControllerTestEnv.java | 2 +- .../kafka/controller/QuorumFeaturesTest.java | 8 ++++---- .../ReplicationControlManagerTest.java | 2 +- .../org/apache/kafka/server/BrokerFeatures.java | 2 +- 9 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 8cc14516126a0..16e1efd39d182 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -194,7 +194,7 @@ class ControllerServer( startupDeadline, time) val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections) val quorumFeatures = new QuorumFeatures(config.nodeId, - QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled), + QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled), controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava) val delegationTokenKeyString = { @@ -292,7 +292,7 @@ class ControllerServer( clusterId, time, s"controller-${config.nodeId}-", - QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled), + QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled), incarnationId, listenerInfo) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index e3b89d68845d3..554b437621b9a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -55,7 +55,7 @@ public static Optional reasonNotSupported( return Optional.empty(); } - public static Map defaultFeatureMap(boolean enableUnstable) { + public static Map defaultSupportedFeatureMap(boolean enableUnstable) { Map features = new HashMap<>(1); features.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), @@ -63,7 +63,7 @@ public static Map defaultFeatureMap(boolean enableUnstable MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); for (Feature feature : Feature.PRODUCTION_FEATURES) { - short maxVersion = enableUnstable ? feature.latestTesting() : feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); + short maxVersion = enableUnstable ? feature.latestTesting() : feature.latestProduction(); if (maxVersion > 0) { features.put(feature.featureName(), VersionRange.of(feature.minimumProduction(), maxVersion)); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index cc147c0ce6d6e..e7d190339f44d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -94,7 +94,7 @@ public void testReplay(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -155,7 +155,7 @@ public void testReplayRegisterBrokerRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -208,7 +208,7 @@ public void testReplayBrokerRegistrationChangeRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -263,7 +263,7 @@ public void testRegistrationWithIncorrectClusterId() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -301,7 +301,7 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); @@ -364,7 +364,7 @@ public void testUnregister() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -403,7 +403,7 @@ public void testPlaceReplicas(int numUsableBrokers) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -466,7 +466,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index 31221da79ef8f..e0267a1eda0d9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -79,7 +79,7 @@ private static Map versionMap(Object... args) { } public static QuorumFeatures features(Object... args) { - Map features = QuorumFeatures.defaultFeatureMap(true); + Map features = QuorumFeatures.defaultSupportedFeatureMap(true); features.putAll(rangeMap(args)); return new QuorumFeatures(0, features, emptyList()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index e607f7faa1e7c..407e03f4b500d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -47,7 +47,7 @@ public void setUp() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 70ae12c96cd66..fa13e2cb71018 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -108,7 +108,7 @@ private QuorumControllerTestEnv( builder.setRaftClient(logEnv.logManagers().get(nodeId)); builder.setBootstrapMetadata(bootstrapMetadata); builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs); - builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultFeatureMap(true), nodeIds)); + builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds)); sessionTimeoutMillis.ifPresent(timeout -> builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) ); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 044a161018559..b21b7212c0a3a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -61,7 +61,7 @@ public void testDefaultFeatureMap() { MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.LATEST_PRODUCTION.featureLevel())); for (Feature feature : Feature.PRODUCTION_FEATURES) { - short maxVersion = feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); + short maxVersion = feature.latestProduction(); if (maxVersion > 0) { expectedFeatures.put(feature.featureName(), VersionRange.of( feature.minimumProduction(), @@ -69,7 +69,7 @@ public void testDefaultFeatureMap() { )); } } - assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false)); + assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(false)); } @Test @@ -87,13 +87,13 @@ public void testDefaultFeatureMapWithUnstable() { )); } } - assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true)); + assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(true)); } @ParameterizedTest @ValueSource(booleans = {true, false}) public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean unstableVersionsEnabled) { - Map quorumFeatures = QuorumFeatures.defaultFeatureMap(unstableVersionsEnabled); + Map quorumFeatures = QuorumFeatures.defaultSupportedFeatureMap(unstableVersionsEnabled); for (VersionRange range : quorumFeatures.values()) { assertNotEquals(0, range.max()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 58d97c80b7c78..fae6d17e0bfe8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -244,7 +244,7 @@ private ReplicationControlTestContext( this.featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); diff --git a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java index eda52b252f372..197ebea427a58 100644 --- a/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java +++ b/server/src/main/java/org/apache/kafka/server/BrokerFeatures.java @@ -72,7 +72,7 @@ public static Features defaultSupportedFeatures(boolean u unstableFeatureVersionsEnabled ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); PRODUCTION_FEATURES.forEach(feature -> { - int maxVersion = unstableFeatureVersionsEnabled ? feature.latestTesting() : feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); + int maxVersion = unstableFeatureVersionsEnabled ? feature.latestTesting() : feature.latestProduction(); if (maxVersion > 0) { features.put(feature.featureName(), new SupportedVersionRange(feature.minimumProduction(), (short) maxVersion)); } From 92dfdf602872633f0ec70d35c0709cb0ead9ef04 Mon Sep 17 00:00:00 2001 From: Justine Date: Wed, 11 Dec 2024 16:41:17 -0800 Subject: [PATCH 6/6] Update test and log message --- .../producer/internals/TransactionManager.java | 2 +- .../tests/core/transactions_upgrade_test.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c6ab5cb5c811c..54241b093c3a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -446,7 +446,7 @@ public synchronized void maybeUpdateTransactionV2Enabled() { latestFinalizedFeaturesEpoch = info.finalizedFeaturesEpoch; Short transactionVersion = info.finalizedFeatures.get("transaction.version"); isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; - log.debug("Updating isTV2 enabled to {} at with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch); + log.debug("Updating isTV2 enabled to {} with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch); } public boolean isTransactionV2Enabled() { diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py index 105e55ab93a1d..d9819b038437e 100644 --- a/tests/kafkatest/tests/core/transactions_upgrade_test.py +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -50,7 +50,7 @@ def __init__(self, test_context): # Test parameters self.num_input_partitions = 1 self.num_output_partitions = 1 - self.num_seed_messages = 4000 + self.num_seed_messages = 4500 self.transaction_size = 5 # The transaction timeout should be lower than the progress timeout, but at @@ -143,11 +143,13 @@ def perform_upgrade(self, from_kafka_version): self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) self.logger.info("Changing transaction.version to %s" % LATEST_STABLE_TRANSACTION_VERSION) self.kafka.run_features_command("upgrade", "transaction.version", LATEST_STABLE_TRANSACTION_VERSION) - # Restart last broker to resend api versions - node = self.kafka.nodes[2] - self.kafka.stop_node(node) - self.kafka.start_node(node) - self.wait_until_rejoin() + # Restart brokers to ensure new api versions sent + for node in self.kafka.nodes: + self.logger.info("Stopping broker node %s" % node.account.hostname) + self.kafka.stop_node(node) + self.logger.info("Restarting broker node %s" % node.account.hostname) + self.kafka.start_node(node) + self.wait_until_rejoin() def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic, num_copiers, num_messages_to_copy, @@ -176,7 +178,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic self.perform_upgrade(from_kafka_version) - copier_timeout_sec = 120 + copier_timeout_sec = 180 for copier in copiers: wait_until(lambda: copier.is_done, timeout_sec=copier_timeout_sec,