diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index ab99b3d3e4666..ad73fa6ba37de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -460,6 +460,7 @@ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatializa Short transactionVersion = info.finalizedFeatures.get("transaction.version"); boolean wasTransactionV2Enabled = isTransactionV2Enabled; isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2; + log.debug("Updating isTV2 enabled to {} with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch); if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled) clientSideEpochBumpRequired = true; } diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 8cc14516126a0..16e1efd39d182 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -194,7 +194,7 @@ class ControllerServer( startupDeadline, time) val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections) val quorumFeatures = new QuorumFeatures(config.nodeId, - QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled), + QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled), controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava) val delegationTokenKeyString = { @@ -292,7 +292,7 @@ class ControllerServer( clusterId, time, s"controller-${config.nodeId}-", - QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled), + QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled), incarnationId, listenerInfo) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index cc615c1966ee3..90017d7b75e74 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -54,7 +54,7 @@ public static Optional reasonNotSupported( return Optional.empty(); } - public static Map defaultFeatureMap(boolean enableUnstable) { + public static Map defaultSupportedFeatureMap(boolean enableUnstable) { Map features = new HashMap<>(1); features.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index cc147c0ce6d6e..e7d190339f44d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -94,7 +94,7 @@ public void testReplay(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -155,7 +155,7 @@ public void testReplayRegisterBrokerRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -208,7 +208,7 @@ public void testReplayBrokerRegistrationChangeRecord() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -263,7 +263,7 @@ public void testRegistrationWithIncorrectClusterId() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -301,7 +301,7 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); @@ -364,7 +364,7 @@ public void testUnregister() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -403,7 +403,7 @@ public void testPlaceReplicas(int numUsableBrokers) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). @@ -466,7 +466,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index e2d3970445e6d..1ec40de23dc25 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -77,7 +77,7 @@ private static Map versionMap(Object... args) { } public static QuorumFeatures features(Object... args) { - Map features = QuorumFeatures.defaultFeatureMap(true); + Map features = QuorumFeatures.defaultSupportedFeatureMap(true); features.putAll(rangeMap(args)); return new QuorumFeatures(0, features, emptyList()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index e607f7faa1e7c..407e03f4b500d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -47,7 +47,7 @@ public void setUp() { FeatureControlManager featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). build(); ClusterControlManager clusterControl = new ClusterControlManager.Builder(). diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index 70ae12c96cd66..fa13e2cb71018 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -108,7 +108,7 @@ private QuorumControllerTestEnv( builder.setRaftClient(logEnv.logManagers().get(nodeId)); builder.setBootstrapMetadata(bootstrapMetadata); builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs); - builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultFeatureMap(true), nodeIds)); + builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds)); sessionTimeoutMillis.ifPresent(timeout -> builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS)) ); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java index 2ecf2f75cfba0..49e0519fbc167 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java @@ -57,7 +57,7 @@ public void testDefaultFeatureMap() { MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.LATEST_PRODUCTION.featureLevel())); for (Feature feature : Feature.PRODUCTION_FEATURES) { - short maxVersion = feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION); + short maxVersion = feature.latestProduction(); if (maxVersion > 0) { expectedFeatures.put(feature.featureName(), VersionRange.of( feature.minimumProduction(), @@ -65,7 +65,7 @@ public void testDefaultFeatureMap() { )); } } - assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false)); + assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(false)); } @Test @@ -83,13 +83,13 @@ public void testDefaultFeatureMapWithUnstable() { )); } } - assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true)); + assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(true)); } @ParameterizedTest @ValueSource(booleans = {true, false}) public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean unstableVersionsEnabled) { - Map quorumFeatures = QuorumFeatures.defaultFeatureMap(unstableVersionsEnabled); + Map quorumFeatures = QuorumFeatures.defaultSupportedFeatureMap(unstableVersionsEnabled); for (VersionRange range : quorumFeatures.values()) { assertNotEquals(0, range.max()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 58d97c80b7c78..fae6d17e0bfe8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -244,7 +244,7 @@ private ReplicationControlTestContext( this.featureControl = new FeatureControlManager.Builder(). setSnapshotRegistry(snapshotRegistry). setQuorumFeatures(new QuorumFeatures(0, - QuorumFeatures.defaultFeatureMap(true), + QuorumFeatures.defaultSupportedFeatureMap(true), Collections.singletonList(0))). setMetadataVersion(metadataVersion). build(); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java index 45546c447b036..fc85f55606f71 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/TransactionVersion.java @@ -33,7 +33,7 @@ public enum TransactionVersion implements FeatureVersion { public static final String FEATURE_NAME = "transaction.version"; - public static final TransactionVersion LATEST_PRODUCTION = TV_0; + public static final TransactionVersion LATEST_PRODUCTION = TV_2; private final short featureLevel; private final MetadataVersion bootstrapMetadataVersion; diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index b713aacb04e55..ba1b693786764 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -33,6 +33,7 @@ from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH from kafkatest.version import KafkaVersion +from kafkatest.version import get_version from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config @@ -205,6 +206,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI use_new_coordinator=None, consumer_group_migration_policy=None, dynamicRaftQuorum=False, + use_transactions_v2=False ): """ :param context: test context @@ -268,6 +270,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI :param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used. :param consumer_group_migration_policy: The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa. :param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag + :param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890 """ self.zk = zk @@ -292,6 +295,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI # Assign the determined value. self.use_new_coordinator = use_new_coordinator + self.use_transactions_v2 = use_transactions_v2 # Set consumer_group_migration_policy based on context and arguments. if consumer_group_migration_policy is None: @@ -887,6 +891,11 @@ def start_node(self, node, timeout_sec=60, **kwargs): else: cmd += " --standalone" self.standalone_controller_bootstrapped = True + if self.use_transactions_v2: + cmd += " --feature transaction.version=2" + else: + if get_version(node).supports_feature_command(): + cmd += " --feature transaction.version=0" self.logger.info("Running log directory format command...\n%s" % cmd) node.account.ssh(cmd) @@ -920,18 +929,25 @@ def wait_for_start(self, node, monitor, timeout_sec=60): raise Exception("No process ids recorded on node %s" % node.account.hostname) def upgrade_metadata_version(self, new_version): - self.run_features_command("upgrade", new_version) + self.run_metadata_features_command("upgrade", new_version) def downgrade_metadata_version(self, new_version): - self.run_features_command("downgrade", new_version) + self.run_metadata_features_command("downgrade", new_version) - def run_features_command(self, op, new_version): + def run_metadata_features_command(self, op, new_version): cmd = self.path.script("kafka-features.sh ") cmd += "--bootstrap-server %s " % self.bootstrap_servers() cmd += "%s --metadata %s" % (op, new_version) self.logger.info("Running %s command...\n%s" % (op, cmd)) self.nodes[0].account.ssh(cmd) + def run_features_command(self, op, feature, new_version): + cmd = self.path.script("kafka-features.sh ") + cmd += "--bootstrap-server %s " % self.bootstrap_servers() + cmd += "%s --feature %s=%s" % (op, feature, new_version) + self.logger.info("Running %s command...\n%s" % (op, cmd)) + self.nodes[0].account.ssh(cmd) + def pids(self, node): """Return process ids associated with running processes on the given node.""" try: diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 5fbd987a97c66..e329b3f7ac7b8 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -58,11 +58,6 @@ def __init__(self, test_context): self.progress_timeout_sec = 60 self.consumer_group = "transactions-test-consumer-group" - self.kafka = KafkaService(test_context, - num_nodes=self.num_brokers, - zk=None, - controller_num_nodes_override=1) - def seed_messages(self, topic, num_seed_messages): seed_timeout_sec = 10000 seed_producer = VerifiableProducer(context=self.test_context, @@ -212,7 +207,8 @@ def setup_topics(self): check_order=[True, False], use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, - use_new_coordinator=[False] + use_new_coordinator=[False], + use_transactions_v2=[True, False] ) @matrix( failure_mode=["hard_bounce", "clean_bounce"], @@ -221,9 +217,15 @@ def setup_topics(self): use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, use_new_coordinator=[True], - group_protocol=consumer_group.all_group_protocols + group_protocol=consumer_group.all_group_protocols, + use_transactions_v2=[True, False] ) - def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None): + def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None, use_transactions_v2=False): + self.kafka = KafkaService(self.test_context, + num_nodes=self.num_brokers, + zk=None, + controller_num_nodes_override=1, + use_transactions_v2=use_transactions_v2) security_protocol = 'PLAINTEXT' self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py index 5c72a6fa806cf..d9819b038437e 100644 --- a/tests/kafkatest/tests/core/transactions_upgrade_test.py +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -21,7 +21,7 @@ from kafkatest.utils import is_int from kafkatest.utils.transactions_utils import create_and_start_copiers from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ - LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION + LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION, LATEST_STABLE_TRANSACTION_VERSION from ducktape.tests.test import Test from ducktape.mark import matrix @@ -48,9 +48,9 @@ def __init__(self, test_context): self.replication_factor = 3 # Test parameters - self.num_input_partitions = 3 - self.num_output_partitions = 3 - self.num_seed_messages = 1000 + self.num_input_partitions = 1 + self.num_output_partitions = 1 + self.num_seed_messages = 4500 self.transaction_size = 5 # The transaction timeout should be lower than the progress timeout, but at @@ -141,6 +141,15 @@ def perform_upgrade(self, from_kafka_version): self.logger.info("Successfully restarted broker node %s" % node.account.hostname) self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) + self.logger.info("Changing transaction.version to %s" % LATEST_STABLE_TRANSACTION_VERSION) + self.kafka.run_features_command("upgrade", "transaction.version", LATEST_STABLE_TRANSACTION_VERSION) + # Restart brokers to ensure new api versions sent + for node in self.kafka.nodes: + self.logger.info("Stopping broker node %s" % node.account.hostname) + self.kafka.stop_node(node) + self.logger.info("Restarting broker node %s" % node.account.hostname) + self.kafka.start_node(node) + self.wait_until_rejoin() def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic, num_copiers, num_messages_to_copy, @@ -169,7 +178,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic self.perform_upgrade(from_kafka_version) - copier_timeout_sec = 120 + copier_timeout_sec = 180 for copier in copiers: wait_until(lambda: copier.is_done, timeout_sec=copier_timeout_sec, @@ -197,7 +206,7 @@ def setup_topics(self): } } - @cluster(num_nodes=10) + @cluster(num_nodes=8) @matrix( from_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)], metadata_quorum=[isolated_kraft], diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 8278e71b05156..6eeeedd44528d 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -97,6 +97,9 @@ def supports_fk_joins(self): # -> https://issues.apache.org/jira/browse/KAFKA-14646 return hasattr(self, "version") and self >= V_3_4_0 + def supports_feature_command(self): + return self >= V_3_8_0 + def get_version(node=None): """Return the version attached to the given node. Default to DEV_BRANCH if node or node.version is undefined (aka None) @@ -109,6 +112,7 @@ def get_version(node=None): DEV_BRANCH = KafkaVersion("dev") DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT") +LATEST_STABLE_TRANSACTION_VERSION = 2 # This should match the LATEST_PRODUCTION version defined in MetadataVersion.java LATEST_STABLE_METADATA_VERSION = "4.0-IV0"