Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public synchronized void maybeUpdateTransactionV2Enabled(boolean onInitiatializa
Short transactionVersion = info.finalizedFeatures.get("transaction.version");
boolean wasTransactionV2Enabled = isTransactionV2Enabled;
isTransactionV2Enabled = transactionVersion != null && transactionVersion >= 2;
log.debug("Updating isTV2 enabled to {} with FinalizedFeaturesEpoch {}", isTransactionV2Enabled, latestFinalizedFeaturesEpoch);
if (!onInitiatialization && !wasTransactionV2Enabled && isTransactionV2Enabled)
clientSideEpochBumpRequired = true;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class ControllerServer(
startupDeadline, time)
val controllerNodes = QuorumConfig.voterConnectionsToNodes(voterConnections)
val quorumFeatures = new QuorumFeatures(config.nodeId,
QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled),
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)

val delegationTokenKeyString = {
Expand Down Expand Up @@ -292,7 +292,7 @@ class ControllerServer(
clusterId,
time,
s"controller-${config.nodeId}-",
QuorumFeatures.defaultFeatureMap(config.unstableFeatureVersionsEnabled),
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
incarnationId,
listenerInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static Optional<String> reasonNotSupported(
return Optional.empty();
}

public static Map<String, VersionRange> defaultFeatureMap(boolean enableUnstable) {
public static Map<String, VersionRange> defaultSupportedFeatureMap(boolean enableUnstable) {
Map<String, VersionRange> features = new HashMap<>(1);
features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testReplay(MetadataVersion metadataVersion) {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testReplayRegisterBrokerRecord() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testReplayBrokerRegistrationChangeRecord() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testRegistrationWithIncorrectClusterId() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -301,7 +301,7 @@ public void testRegisterBrokerRecordVersion(MetadataVersion metadataVersion) {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
Expand Down Expand Up @@ -364,7 +364,7 @@ public void testUnregister() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testPlaceReplicas(int numUsableBrokers) {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down Expand Up @@ -466,7 +466,7 @@ public void testRegistrationsToRecords(MetadataVersion metadataVersion) {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static Map<String, Short> versionMap(Object... args) {
}

public static QuorumFeatures features(Object... args) {
Map<String, VersionRange> features = QuorumFeatures.defaultFeatureMap(true);
Map<String, VersionRange> features = QuorumFeatures.defaultSupportedFeatureMap(true);
features.putAll(rangeMap(args));
return new QuorumFeatures(0, features, emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void setUp() {
FeatureControlManager featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
build();
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private QuorumControllerTestEnv(
builder.setRaftClient(logEnv.logManagers().get(nodeId));
builder.setBootstrapMetadata(bootstrapMetadata);
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultFeatureMap(true), nodeIds));
builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds));
sessionTimeoutMillis.ifPresent(timeout ->
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ public void testDefaultFeatureMap() {
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
for (Feature feature : Feature.PRODUCTION_FEATURES) {
short maxVersion = feature.defaultLevel(MetadataVersion.LATEST_PRODUCTION);
short maxVersion = feature.latestProduction();
if (maxVersion > 0) {
expectedFeatures.put(feature.featureName(), VersionRange.of(
feature.minimumProduction(),
maxVersion
));
}
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(false));
assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(false));
}

@Test
Expand All @@ -83,13 +83,13 @@ public void testDefaultFeatureMapWithUnstable() {
));
}
}
assertEquals(expectedFeatures, QuorumFeatures.defaultFeatureMap(true));
assertEquals(expectedFeatures, QuorumFeatures.defaultSupportedFeatureMap(true));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void ensureDefaultSupportedFeaturesRangeMaxNotZero(boolean unstableVersionsEnabled) {
Map<String, VersionRange> quorumFeatures = QuorumFeatures.defaultFeatureMap(unstableVersionsEnabled);
Map<String, VersionRange> quorumFeatures = QuorumFeatures.defaultSupportedFeatureMap(unstableVersionsEnabled);
for (VersionRange range : quorumFeatures.values()) {
assertNotEquals(0, range.max());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private ReplicationControlTestContext(
this.featureControl = new FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultFeatureMap(true),
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public enum TransactionVersion implements FeatureVersion {

public static final String FEATURE_NAME = "transaction.version";

public static final TransactionVersion LATEST_PRODUCTION = TV_0;
public static final TransactionVersion LATEST_PRODUCTION = TV_2;

private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
Expand Down
22 changes: 19 additions & 3 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH
from kafkatest.version import KafkaVersion
from kafkatest.version import get_version
from kafkatest.services.kafka.util import fix_opts_for_new_jvm, get_log4j_config_param, get_log4j_config


Expand Down Expand Up @@ -205,6 +206,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
use_new_coordinator=None,
consumer_group_migration_policy=None,
dynamicRaftQuorum=False,
use_transactions_v2=False
):
"""
:param context: test context
Expand Down Expand Up @@ -268,6 +270,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
:param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used.
:param consumer_group_migration_policy: The config that enables converting the non-empty classic group using the consumer embedded protocol to the non-empty consumer group using the consumer group protocol and vice versa.
:param dynamicRaftQuorum: When true, controller_quorum_bootstrap_servers, and bootstraps the first controller using the standalone flag
:param use_transactions_v2: When true, uses transaction.version=2 which utilizes the new transaction protocol introduced in KIP-890
"""

self.zk = zk
Expand All @@ -292,6 +295,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI

# Assign the determined value.
self.use_new_coordinator = use_new_coordinator
self.use_transactions_v2 = use_transactions_v2

# Set consumer_group_migration_policy based on context and arguments.
if consumer_group_migration_policy is None:
Expand Down Expand Up @@ -887,6 +891,11 @@ def start_node(self, node, timeout_sec=60, **kwargs):
else:
cmd += " --standalone"
self.standalone_controller_bootstrapped = True
if self.use_transactions_v2:
cmd += " --feature transaction.version=2"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We missed setting transaction.version for isolated kraft, resulting in the cluster using v0. Since this nullifies the suit use_transactions_v2=True, we will submit a patch later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is there a separate place to see this? That is unfortunate.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I observed this issue while running tests/kafkatest/tests/core/transactions_test.py

The settings "isolated_kraft" and "use_transactions_v2=true" failed to enable tv2 on the cluster

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know how this maps to the kafka.py file? Ie, why this line of code doesn't apply for isolated kraft?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know how this maps to the kafka.py file? Ie, why this line of code doesn't apply for isolated kraft?

see #21164

else:
if get_version(node).supports_feature_command():
cmd += " --feature transaction.version=0"
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)

Expand Down Expand Up @@ -920,18 +929,25 @@ def wait_for_start(self, node, monitor, timeout_sec=60):
raise Exception("No process ids recorded on node %s" % node.account.hostname)

def upgrade_metadata_version(self, new_version):
self.run_features_command("upgrade", new_version)
self.run_metadata_features_command("upgrade", new_version)

def downgrade_metadata_version(self, new_version):
self.run_features_command("downgrade", new_version)
self.run_metadata_features_command("downgrade", new_version)

def run_features_command(self, op, new_version):
def run_metadata_features_command(self, op, new_version):
cmd = self.path.script("kafka-features.sh ")
cmd += "--bootstrap-server %s " % self.bootstrap_servers()
cmd += "%s --metadata %s" % (op, new_version)
self.logger.info("Running %s command...\n%s" % (op, cmd))
self.nodes[0].account.ssh(cmd)

def run_features_command(self, op, feature, new_version):
cmd = self.path.script("kafka-features.sh ")
cmd += "--bootstrap-server %s " % self.bootstrap_servers()
cmd += "%s --feature %s=%s" % (op, feature, new_version)
self.logger.info("Running %s command...\n%s" % (op, cmd))
self.nodes[0].account.ssh(cmd)

def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:
Expand Down
18 changes: 10 additions & 8 deletions tests/kafkatest/tests/core/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ def __init__(self, test_context):
self.progress_timeout_sec = 60
self.consumer_group = "transactions-test-consumer-group"

self.kafka = KafkaService(test_context,
num_nodes=self.num_brokers,
zk=None,
controller_num_nodes_override=1)

def seed_messages(self, topic, num_seed_messages):
seed_timeout_sec = 10000
seed_producer = VerifiableProducer(context=self.test_context,
Expand Down Expand Up @@ -212,7 +207,8 @@ def setup_topics(self):
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
use_new_coordinator=[False]
use_new_coordinator=[False],
use_transactions_v2=[True, False]
)
@matrix(
failure_mode=["hard_bounce", "clean_bounce"],
Expand All @@ -221,9 +217,15 @@ def setup_topics(self):
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
group_protocol=consumer_group.all_group_protocols,
use_transactions_v2=[True, False]
)
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None):
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum, use_new_coordinator=False, group_protocol=None, use_transactions_v2=False):
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
zk=None,
controller_num_nodes_override=1,
use_transactions_v2=use_transactions_v2)
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
Expand Down
21 changes: 15 additions & 6 deletions tests/kafkatest/tests/core/transactions_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from kafkatest.utils import is_int
from kafkatest.utils.transactions_utils import create_and_start_copiers
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION, LATEST_STABLE_TRANSACTION_VERSION

from ducktape.tests.test import Test
from ducktape.mark import matrix
Expand All @@ -48,9 +48,9 @@ def __init__(self, test_context):
self.replication_factor = 3

# Test parameters
self.num_input_partitions = 3
self.num_output_partitions = 3
self.num_seed_messages = 1000
self.num_input_partitions = 1
self.num_output_partitions = 1
self.num_seed_messages = 4500
self.transaction_size = 5

# The transaction timeout should be lower than the progress timeout, but at
Expand Down Expand Up @@ -141,6 +141,15 @@ def perform_upgrade(self, from_kafka_version):
self.logger.info("Successfully restarted broker node %s" % node.account.hostname)
self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION)
self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
self.logger.info("Changing transaction.version to %s" % LATEST_STABLE_TRANSACTION_VERSION)
self.kafka.run_features_command("upgrade", "transaction.version", LATEST_STABLE_TRANSACTION_VERSION)
# Restart brokers to ensure new api versions sent
for node in self.kafka.nodes:
self.logger.info("Stopping broker node %s" % node.account.hostname)
self.kafka.stop_node(node)
self.logger.info("Restarting broker node %s" % node.account.hostname)
self.kafka.start_node(node)
self.wait_until_rejoin()

def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic,
num_copiers, num_messages_to_copy,
Expand Down Expand Up @@ -169,7 +178,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic

self.perform_upgrade(from_kafka_version)

copier_timeout_sec = 120
copier_timeout_sec = 180
for copier in copiers:
wait_until(lambda: copier.is_done,
timeout_sec=copier_timeout_sec,
Expand Down Expand Up @@ -197,7 +206,7 @@ def setup_topics(self):
}
}

@cluster(num_nodes=10)
@cluster(num_nodes=8)
@matrix(
from_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
metadata_quorum=[isolated_kraft],
Expand Down
4 changes: 4 additions & 0 deletions tests/kafkatest/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def supports_fk_joins(self):
# -> https://issues.apache.org/jira/browse/KAFKA-14646
return hasattr(self, "version") and self >= V_3_4_0

def supports_feature_command(self):
return self >= V_3_8_0

def get_version(node=None):
"""Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None)
Expand All @@ -109,6 +112,7 @@ def get_version(node=None):
DEV_BRANCH = KafkaVersion("dev")
DEV_VERSION = KafkaVersion("4.1.0-SNAPSHOT")

LATEST_STABLE_TRANSACTION_VERSION = 2
# This should match the LATEST_PRODUCTION version defined in MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV0"

Expand Down