Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions core/src/test/java/kafka/testkit/BrokerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.testkit;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
Expand All @@ -42,7 +41,7 @@ public static Builder builder() {
public static class Builder {
private int id = -1;
private String baseDirectory;
private Uuid clusterId;
private String clusterId;
private int numLogDirectories = 1;
private Map<String, String> propertyOverrides = Collections.emptyMap();
private boolean combined;
Expand All @@ -63,7 +62,7 @@ public Builder setNumLogDirectories(int numLogDirectories) {
return this;
}

public Builder setClusterId(Uuid clusterId) {
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
return this;
}
Expand Down Expand Up @@ -113,7 +112,7 @@ public BrokerNode build() {
for (String logDir : logDataDirectories) {
copier.setLogDirProps(logDir, new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(clusterId.toString()).
setClusterId(clusterId).
setNodeId(id).
setDirectoryId(copier.generateValidDirectoryId()).
build());
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/kafka/testkit/BrokerNodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public void testInvalidBuilder() {
Assertions.assertEquals("You must set the node id.",
Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
.setBaseDirectory("foo")
.setClusterId(Uuid.randomUuid())
.setClusterId(Uuid.randomUuid().toString())
.build()).getMessage());

Assertions.assertEquals("The value of numLogDirectories should be at least 1.",
Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
.setBaseDirectory("foo")
.setClusterId(Uuid.randomUuid())
.setClusterId(Uuid.randomUuid().toString())
.setId(0)
.setNumLogDirectories(0)
.build()).getMessage());
Expand Down
5 changes: 2 additions & 3 deletions core/src/test/java/kafka/testkit/ControllerNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.testkit;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
Expand All @@ -37,7 +36,7 @@ public static Builder builder() {
public static class Builder {
private int id = -1;
private String baseDirectory;
private Uuid clusterId;
private String clusterId;
private boolean combined;
private Map<String, String> propertyOverrides = Collections.emptyMap();

Expand All @@ -52,7 +51,7 @@ public Builder setId(int id) {
return this;
}

public Builder setClusterId(Uuid clusterId) {
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
return this;
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/java/kafka/testkit/TestKitNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ public class TestKitNodes {

public static class Builder {
private boolean combined;
private Uuid clusterId;
private String clusterId;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "testkit");

public Builder setClusterId(Uuid clusterId) {
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
return this;
}
Expand Down Expand Up @@ -103,7 +103,7 @@ public TestKitNodes build() {

String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
if (clusterId == null) {
clusterId = Uuid.randomUuid();
clusterId = Uuid.randomUuid().toString();
}

int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;
Expand Down Expand Up @@ -162,14 +162,14 @@ public TestKitNodes build() {
}

private final String baseDirectory;
private final Uuid clusterId;
private final String clusterId;
private final BootstrapMetadata bootstrapMetadata;
private final SortedMap<Integer, ControllerNode> controllerNodes;
private final SortedMap<Integer, BrokerNode> brokerNodes;

private TestKitNodes(
String baseDirectory,
Uuid clusterId,
String clusterId,
BootstrapMetadata bootstrapMetadata,
SortedMap<Integer, ControllerNode> controllerNodes,
SortedMap<Integer, BrokerNode> brokerNodes
Expand All @@ -189,7 +189,7 @@ public String baseDirectory() {
return baseDirectory;
}

public Uuid clusterId() {
public String clusterId() {
return clusterId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.Uuid
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.MetadataVersion
Expand Down Expand Up @@ -60,7 +59,7 @@ class KafkaServerKRaftRegistrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -102,7 +101,7 @@ class KafkaServerKRaftRegistrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
Expand Down Expand Up @@ -175,7 +175,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -311,7 +311,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -449,7 +449,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -517,7 +517,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_9_IV1).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -603,7 +603,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -685,7 +685,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -749,7 +749,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down Expand Up @@ -828,7 +828,7 @@ class ZkMigrationIntegrationTest {
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV0).
setClusterId(Uuid.fromString(clusterId)).
setClusterId(clusterId).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
.setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public final class RaftClientTestContext {
private int appendLingerMs;

private final QuorumStateStore quorumStateStore;
final Uuid clusterId;
final String clusterId;
private final OptionalInt localId;
public final Uuid localDirectoryId;
public final KRaftVersion kraftVersion;
Expand Down Expand Up @@ -150,7 +150,7 @@ public static final class Builder {
private final MockableRandom random = new MockableRandom(1L);
private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION, Uuid.METADATA_TOPIC_ID, logContext);
private final Uuid clusterId = Uuid.randomUuid();
private final String clusterId = Uuid.randomUuid().toString();
private final OptionalInt localId;
private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0;
private final Uuid localDirectoryId;
Expand Down Expand Up @@ -382,7 +382,7 @@ public RaftClientTestContext build() throws IOException {
time,
new MockExpirationService(time),
FETCH_MAX_WAIT_MS,
clusterId.toString(),
clusterId,
bootstrapServers,
localListeners,
logContext,
Expand Down Expand Up @@ -429,7 +429,7 @@ public RaftClientTestContext build() throws IOException {

@SuppressWarnings("ParameterNumber")
private RaftClientTestContext(
Uuid clusterId,
String clusterId,
OptionalInt localId,
Uuid localDirectoryId,
KRaftVersion kraftVersion,
Expand Down Expand Up @@ -1174,7 +1174,7 @@ EndQuorumEpochRequestData endEpochRequest(
List<ReplicaKey> preferredCandidates
) {
return endEpochRequest(
clusterId.toString(),
clusterId,
epoch,
leaderId,
preferredCandidates
Expand All @@ -1197,7 +1197,7 @@ EndQuorumEpochRequestData endEpochRequest(
}

BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId) {
return beginEpochRequest(clusterId.toString(), epoch, leaderId);
return beginEpochRequest(clusterId, epoch, leaderId);
}

BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch, int leaderId) {
Expand Down Expand Up @@ -1244,7 +1244,7 @@ VoteRequestData voteRequest(
long lastEpochOffset
) {
return voteRequest(
clusterId.toString(),
clusterId,
epoch,
candidateKey,
lastEpoch,
Expand Down Expand Up @@ -1404,7 +1404,7 @@ FetchRequestData fetchRequest(
) {
return fetchRequest(
epoch,
clusterId.toString(),
clusterId,
replicaKey,
fetchOffset,
lastFetchedEpoch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ private static class Cluster {
final Random random;
final AtomicInteger correlationIdCounter = new AtomicInteger();
final MockTime time = new MockTime();
final Uuid clusterId = Uuid.randomUuid();
final String clusterId = Uuid.randomUuid().toString();
final Map<Integer, Node> voters = new HashMap<>();
final Map<Integer, PersistentState> nodes = new HashMap<>();
final Map<Integer, RaftNode> running = new HashMap<>();
Expand Down Expand Up @@ -788,7 +788,7 @@ void start(int nodeId) {
time,
new MockExpirationService(time),
FETCH_MAX_WAIT_MS,
clusterId.toString(),
clusterId,
Collections.emptyList(),
endpointsFromId(nodeId, channel.listenerName()),
logContext,
Expand Down