diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java b/core/src/test/java/kafka/testkit/BrokerNode.java index 7c497614c33d7..51da3fe72ee4f 100644 --- a/core/src/test/java/kafka/testkit/BrokerNode.java +++ b/core/src/test/java/kafka/testkit/BrokerNode.java @@ -17,7 +17,6 @@ package kafka.testkit; -import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesVersion; @@ -42,7 +41,7 @@ public static Builder builder() { public static class Builder { private int id = -1; private String baseDirectory; - private Uuid clusterId; + private String clusterId; private int numLogDirectories = 1; private Map propertyOverrides = Collections.emptyMap(); private boolean combined; @@ -63,7 +62,7 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } - public Builder setClusterId(Uuid clusterId) { + public Builder setClusterId(String clusterId) { this.clusterId = clusterId; return this; } @@ -113,7 +112,7 @@ public BrokerNode build() { for (String logDir : logDataDirectories) { copier.setLogDirProps(logDir, new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). - setClusterId(clusterId.toString()). + setClusterId(clusterId). setNodeId(id). setDirectoryId(copier.generateValidDirectoryId()). build()); diff --git a/core/src/test/java/kafka/testkit/BrokerNodeTest.java b/core/src/test/java/kafka/testkit/BrokerNodeTest.java index afc829736d541..220c8ee4e9319 100644 --- a/core/src/test/java/kafka/testkit/BrokerNodeTest.java +++ b/core/src/test/java/kafka/testkit/BrokerNodeTest.java @@ -29,13 +29,13 @@ public void testInvalidBuilder() { Assertions.assertEquals("You must set the node id.", Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder() .setBaseDirectory("foo") - .setClusterId(Uuid.randomUuid()) + .setClusterId(Uuid.randomUuid().toString()) .build()).getMessage()); Assertions.assertEquals("The value of numLogDirectories should be at least 1.", Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder() .setBaseDirectory("foo") - .setClusterId(Uuid.randomUuid()) + .setClusterId(Uuid.randomUuid().toString()) .setId(0) .setNumLogDirectories(0) .build()).getMessage()); diff --git a/core/src/test/java/kafka/testkit/ControllerNode.java b/core/src/test/java/kafka/testkit/ControllerNode.java index cbb1c0964a379..a52a6868e53d7 100644 --- a/core/src/test/java/kafka/testkit/ControllerNode.java +++ b/core/src/test/java/kafka/testkit/ControllerNode.java @@ -17,7 +17,6 @@ package kafka.testkit; -import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesVersion; @@ -37,7 +36,7 @@ public static Builder builder() { public static class Builder { private int id = -1; private String baseDirectory; - private Uuid clusterId; + private String clusterId; private boolean combined; private Map propertyOverrides = Collections.emptyMap(); @@ -52,7 +51,7 @@ public Builder setId(int id) { return this; } - public Builder setClusterId(Uuid clusterId) { + public Builder setClusterId(String clusterId) { this.clusterId = clusterId; return this; } diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java index df486a1b3b498..1e12f25c08c44 100644 --- a/core/src/test/java/kafka/testkit/TestKitNodes.java +++ b/core/src/test/java/kafka/testkit/TestKitNodes.java @@ -40,7 +40,7 @@ public class TestKitNodes { public static class Builder { private boolean combined; - private Uuid clusterId; + private String clusterId; private int numControllerNodes; private int numBrokerNodes; private int numDisksPerBroker = 1; @@ -48,7 +48,7 @@ public static class Builder { private BootstrapMetadata bootstrapMetadata = BootstrapMetadata. fromVersion(MetadataVersion.latestTesting(), "testkit"); - public Builder setClusterId(Uuid clusterId) { + public Builder setClusterId(String clusterId) { this.clusterId = clusterId; return this; } @@ -103,7 +103,7 @@ public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); if (clusterId == null) { - clusterId = Uuid.randomUuid(); + clusterId = Uuid.randomUuid().toString(); } int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET; @@ -162,14 +162,14 @@ public TestKitNodes build() { } private final String baseDirectory; - private final Uuid clusterId; + private final String clusterId; private final BootstrapMetadata bootstrapMetadata; private final SortedMap controllerNodes; private final SortedMap brokerNodes; private TestKitNodes( String baseDirectory, - Uuid clusterId, + String clusterId, BootstrapMetadata bootstrapMetadata, SortedMap controllerNodes, SortedMap brokerNodes @@ -189,7 +189,7 @@ public String baseDirectory() { return baseDirectory; } - public Uuid clusterId() { + public String clusterId() { return clusterId; } diff --git a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala index e0bc197767997..0ebb63db18ce8 100644 --- a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala @@ -22,7 +22,6 @@ import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} -import org.apache.kafka.common.Uuid import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.common.MetadataVersion @@ -60,7 +59,7 @@ class KafkaServerKRaftRegistrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -102,7 +101,7 @@ class KafkaServerKRaftRegistrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index ec9aad30bbc6f..ce8a15833995e 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils import org.apache.kafka.clients.ClientResponse import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE} import org.apache.kafka.common.acl.AclPermissionType.ALLOW import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} @@ -175,7 +175,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -311,7 +311,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(zkCluster.config().metadataVersion()). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -449,7 +449,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -517,7 +517,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV1). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -603,7 +603,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(zkCluster.config().metadataVersion()). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -685,7 +685,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -749,7 +749,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") @@ -828,7 +828,7 @@ class ZkMigrationIntegrationTest { val kraftCluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV0). - setClusterId(Uuid.fromString(clusterId)). + setClusterId(clusterId). setNumBrokerNodes(0). setNumControllerNodes(1).build()) .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true") diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 60147c078888d..825998973baf9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -113,7 +113,7 @@ public final class RaftClientTestContext { private int appendLingerMs; private final QuorumStateStore quorumStateStore; - final Uuid clusterId; + final String clusterId; private final OptionalInt localId; public final Uuid localDirectoryId; public final KRaftVersion kraftVersion; @@ -150,7 +150,7 @@ public static final class Builder { private final MockableRandom random = new MockableRandom(1L); private final LogContext logContext = new LogContext(); private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext); - private final Uuid clusterId = Uuid.randomUuid(); + private final String clusterId = Uuid.randomUuid().toString(); private final OptionalInt localId; private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; private final Uuid localDirectoryId; @@ -382,7 +382,7 @@ public RaftClientTestContext build() throws IOException { time, new MockExpirationService(time), FETCH_MAX_WAIT_MS, - clusterId.toString(), + clusterId, bootstrapServers, localListeners, logContext, @@ -429,7 +429,7 @@ public RaftClientTestContext build() throws IOException { @SuppressWarnings("ParameterNumber") private RaftClientTestContext( - Uuid clusterId, + String clusterId, OptionalInt localId, Uuid localDirectoryId, KRaftVersion kraftVersion, @@ -1174,7 +1174,7 @@ EndQuorumEpochRequestData endEpochRequest( List preferredCandidates ) { return endEpochRequest( - clusterId.toString(), + clusterId, epoch, leaderId, preferredCandidates @@ -1197,7 +1197,7 @@ EndQuorumEpochRequestData endEpochRequest( } BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) { - return beginEpochRequest(clusterId.toString(), epoch, leaderId); + return beginEpochRequest(clusterId, epoch, leaderId); } BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) { @@ -1244,7 +1244,7 @@ VoteRequestData voteRequest( long lastEpochOffset ) { return voteRequest( - clusterId.toString(), + clusterId, epoch, candidateKey, lastEpoch, @@ -1404,7 +1404,7 @@ FetchRequestData fetchRequest( ) { return fetchRequest( epoch, - clusterId.toString(), + clusterId, replicaKey, fetchOffset, lastFetchedEpoch, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index c57c441b98b5f..9144da3cac805 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -536,7 +536,7 @@ private static class Cluster { final Random random; final AtomicInteger correlationIdCounter = new AtomicInteger(); final MockTime time = new MockTime(); - final Uuid clusterId = Uuid.randomUuid(); + final String clusterId = Uuid.randomUuid().toString(); final Map voters = new HashMap<>(); final Map nodes = new HashMap<>(); final Map running = new HashMap<>(); @@ -788,7 +788,7 @@ void start(int nodeId) { time, new MockExpirationService(time), FETCH_MAX_WAIT_MS, - clusterId.toString(), + clusterId, Collections.emptyList(), endpointsFromId(nodeId, channel.listenerName()), logContext,