From f86ab6024e0ddabe01dbb9f849f4c2995558930a Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Mon, 3 Apr 2023 17:45:37 -0400 Subject: [PATCH 01/10] Checkout of work with migration almost working --- checkstyle/suppressions.xml | 2 +- .../main/scala/kafka/tools/StorageTool.scala | 20 ++++-- .../scala/kafka/zk/ZkMigrationClient.scala | 20 ++++++ .../kafka/controller/ScramControlManager.java | 69 ++++++++++++++----- .../kafka/image/ScramCredentialData.java | 58 ++++++++++------ .../migration/KRaftMigrationDriver.java | 3 +- .../metadata/UserScramCredentialRecord.json | 6 +- .../apache/kafka/image/ScramImageTest.java | 6 +- 8 files changed, 132 insertions(+), 52 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5c95c8d4284d4..22bf08e71cc21 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -308,7 +308,7 @@ + files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager).java"/> + throw new TerseFailure(s"Error attempting to create UserScramCredentialRecord: ${e.getMessage}") + } myrecord } diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 90e6a08468b25..002d43700f9a9 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -24,6 +24,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM import kafka.utils.{Logging, PasswordEncoder} import kafka.zk.TopicZNode.TopicIdReplicaAssignment import kafka.zookeeper._ +import org.apache.kafka.clients.admin.ScramMechanism import org.apache.kafka.common.acl.AccessControlEntry import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.ControllerMovedException @@ -32,6 +33,8 @@ import org.apache.kafka.common.metadata._ import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils +import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState} import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} @@ -207,10 +210,27 @@ class ZkMigrationClient( ): Unit = wrapZkException { val adminZkClient = new AdminZkClient(zkClient) + println(s"We are here: ") def migrateEntityType(entityType: String): Unit = { adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) => + println(s"Name: ${name}") val entity = new EntityData().setEntityType(entityType).setEntityName(name) val batch = new util.ArrayList[ApiMessageAndVersion]() + ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => + val propertyValue = props.getProperty(mechanism.mechanismName) + if (propertyValue != null) { + println(s"Found ${propertyValue} for Key:${mechanism.mechanismName}") + val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) + batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord() + .setName(name) + .setMechanism(mechanism.`type`) + .setSalt(scramCredentials.salt) + .setStoredKey(scramCredentials.storedKey) + .setStoredKey(scramCredentials.storedKey) + .setIterations(scramCredentials.iterations), 0.toShort)) + props.remove(mechanism.mechanismName) + } + } ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) => batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() .setEntity(List(entity).asJava) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java index e50a9cd9798a8..2cf4393fe156e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java @@ -26,11 +26,13 @@ import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.requests.ApiError; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; + import org.slf4j.Logger; import java.util.ArrayList; @@ -111,22 +113,25 @@ public String toString() { static class ScramCredentialValue { private final byte[] salt; - private final byte[] saltedPassword; + private final byte[] storedKey; + private final byte[] serverKey; private final int iterations; ScramCredentialValue( byte[] salt, - byte[] saltedPassword, + byte[] storedKey, + byte[] serverKey, int iterations ) { this.salt = salt; - this.saltedPassword = saltedPassword; + this.storedKey = storedKey; + this.serverKey = serverKey; this.iterations = iterations; } @Override public int hashCode() { - return Objects.hash(salt, saltedPassword, iterations); + return Objects.hash(salt, storedKey, serverKey, iterations); } @Override @@ -135,7 +140,8 @@ public boolean equals(Object o) { if (!(o.getClass() == this.getClass())) return false; ScramCredentialValue other = (ScramCredentialValue) o; return Arrays.equals(salt, other.salt) && - Arrays.equals(saltedPassword, other.saltedPassword) && + Arrays.equals(storedKey, other.storedKey) && + Arrays.equals(serverKey, other.serverKey) && iterations == other.iterations; } @@ -143,7 +149,8 @@ public boolean equals(Object o) { public String toString() { return "ScramCredentialValue" + "(salt=" + "[hidden]" + - ", saltedPassword=" + "[hidden]" + + ", storedKey=" + "[hidden]" + + ", serverKey=" + "[hidden]" + ", iterations=" + "[hidden]" + ")"; } @@ -226,16 +233,15 @@ public ControllerResult alterCredentials( setMechanism(deletion.mechanism()), (short) 0)); } for (ScramCredentialUpsertion upsertion : userToUpsert.values()) { - response.results().add(new AlterUserScramCredentialsResult(). - setUser(upsertion.name()). - setErrorCode(NONE.code()). - setErrorMessage(null)); - records.add(new ApiMessageAndVersion(new UserScramCredentialRecord(). - setName(upsertion.name()). - setMechanism(upsertion.mechanism()). - setSalt(upsertion.salt()). - setSaltedPassword(upsertion.saltedPassword()). - setIterations(upsertion.iterations()), (short) 0)); + ApiError error = finishUpsertion(records, upsertion); + if (!error.isFailure()) { + response.results().add(new AlterUserScramCredentialsResult(). + setUser(upsertion.name()). + setErrorCode(NONE.code()). + setErrorMessage(null)); + } else { + userToError.put(upsertion.name(), error); + } } for (Entry entry : userToError.entrySet()) { response.results().add(new AlterUserScramCredentialsResult(). @@ -246,6 +252,31 @@ public ControllerResult alterCredentials( return ControllerResult.atomicOf(records, response); } + static ApiError finishUpsertion(List records, ScramCredentialUpsertion upsertion) { + org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism = + org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName( + ScramMechanism.fromType(upsertion.mechanism()).mechanismName()); + + try { // Convert from saltedPassword to storedKey and serverKey + ScramFormatter formatter = new ScramFormatter(internalMechanism); + + records.add(new ApiMessageAndVersion(new UserScramCredentialRecord(). + setName(upsertion.name()). + setMechanism(upsertion.mechanism()). + setSalt(upsertion.salt()). + + // Convert from saltedPassword to storedKey and serverKey + setStoredKey(formatter.storedKey(formatter.clientKey(upsertion.saltedPassword()))). + setServerKey(formatter.serverKey(upsertion.saltedPassword())). + setIterations(upsertion.iterations()), (short) 0)); + + } catch (Throwable e) { + // XXX Should We just return UNACCEPTABLE_CREDENTIAL --proven 2023/03/31 + return ApiError.fromThrowable(e); + } + return ApiError.NONE; + } + static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) { ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism()); ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism); @@ -300,7 +331,8 @@ public void replay(UserScramCredentialRecord record) { ScramCredentialKey key = new ScramCredentialKey(record.name(), ScramMechanism.fromType(record.mechanism())); ScramCredentialValue value = new ScramCredentialValue(record.salt(), - record.saltedPassword(), + record.storedKey(), + record.serverKey(), record.iterations()); if (credentials.put(key, value) == null) { log.info("Created new SCRAM credential for {} with mechanism {}.", @@ -316,7 +348,8 @@ ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value setName(key.username). setMechanism(key.mechanism.type()). setSalt(value.salt). - setSaltedPassword(value.saltedPassword). + setStoredKey(value.storedKey). + setServerKey(value.serverKey). setIterations(value.iterations), (short) 0); } diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java index ab45046d257fa..a6882d0b49578 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java @@ -20,12 +20,13 @@ import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.security.scram.ScramCredential; -import org.apache.kafka.common.security.scram.internals.ScramFormatter; +// import org.apache.kafka.common.security.scram.internals.ScramFormatter; -import java.security.GeneralSecurityException; +// import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.Objects; +// XXX Can I replace this with just ScramCredential --proven 2023/03/31 /** * Represents the ACLs in the metadata image. @@ -34,7 +35,8 @@ */ public final class ScramCredentialData { private final byte[] salt; - private final byte[] saltedPassword; + private final byte[] storedKey; + private final byte[] serverKey; private final int iterations; static ScramCredentialData fromRecord( @@ -42,17 +44,20 @@ static ScramCredentialData fromRecord( ) { return new ScramCredentialData( record.salt(), - record.saltedPassword(), + record.storedKey(), + record.serverKey(), record.iterations()); } public ScramCredentialData( byte[] salt, - byte[] saltedPassword, + byte[] storedKey, + byte[] serverKey, int iterations ) { this.salt = salt; - this.saltedPassword = saltedPassword; + this.storedKey = storedKey; + this.serverKey = serverKey; this.iterations = iterations; } @@ -60,8 +65,12 @@ public byte[] salt() { return salt; } - public byte[] saltedPassword() { - return saltedPassword; + public byte[] storedKey() { + return storedKey; + } + + public byte[] serverKey() { + return serverKey; } public int iterations() { @@ -76,25 +85,28 @@ public UserScramCredentialRecord toRecord( setName(userName). setMechanism(mechanism.type()). setSalt(salt). - setSaltedPassword(saltedPassword). + setStoredKey(storedKey). + setServerKey(serverKey). setIterations(iterations); } - public ScramCredential toCredential( - ScramMechanism mechanism - ) throws GeneralSecurityException { - org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism = - org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName()); - ScramFormatter formatter = new ScramFormatter(internalMechanism); - return new ScramCredential(salt, - formatter.storedKey(formatter.clientKey(saltedPassword)), - formatter.serverKey(saltedPassword), - iterations); + public ScramCredential toCredential(ScramMechanism mechanism) { +// ) throws GeneralSecurityException { + return new ScramCredential(salt, storedKey, serverKey, iterations); } +// org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism = +// org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName()); +// ScramFormatter formatter = new ScramFormatter(internalMechanism); +// +// return new ScramCredential(salt, +// formatter.storedKey(formatter.clientKey(saltedPassword)), +// formatter.serverKey(saltedPassword), +// iterations); +// } @Override public int hashCode() { - return Objects.hash(salt, saltedPassword, iterations); + return Objects.hash(salt, storedKey, serverKey, iterations); } @Override @@ -103,7 +115,8 @@ public boolean equals(Object o) { if (!o.getClass().equals(ScramCredentialData.class)) return false; ScramCredentialData other = (ScramCredentialData) o; return Arrays.equals(salt, other.salt) && - Arrays.equals(saltedPassword, other.saltedPassword) && + Arrays.equals(storedKey, other.storedKey) && + Arrays.equals(serverKey, other.serverKey) && iterations == other.iterations; } @@ -111,7 +124,8 @@ public boolean equals(Object o) { public String toString() { return "ScramCredentialData" + "(salt=" + "[hidden]" + - ", saltedPassword=" + "[hidden]" + + ", storedKey=" + "[hidden]" + + ", serverKey=" + "[hidden]" + ", iterations=" + "[hidden]" + ")"; } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 1534e1b9509ec..7d32fd43deb97 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -458,8 +458,9 @@ public void run() throws Exception { zkMigrationClient.readAllMetadata(batch -> { try { if (log.isTraceEnabled()) { - log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); + log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); } else { + log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); log.info("Migrating {} records from ZK", batch.size()); } CompletableFuture future = zkRecordConsumer.acceptBatch(batch); diff --git a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json index a24cc53194a46..f3f9687439a95 100644 --- a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json +++ b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json @@ -26,8 +26,10 @@ "about": "The SCRAM mechanism." }, { "name": "Salt", "type": "bytes", "versions": "0+", "about": "A random salt generated by the client." }, - { "name": "SaltedPassword", "type": "bytes", "versions": "0+", - "about": "The salted password." }, + { "name": "StoredKey", "type": "bytes", "versions": "0+", + "about": "The Stored Key." }, + { "name": "ServerKey", "type": "bytes", "versions": "0+", + "about": "The Server Key." }, { "name": "Iterations", "type": "int32", "versions": "0+", "about": "The number of iterations used in the SCRAM credential." } ] diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index b96be23cded19..4031a9c6d5a44 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -59,6 +59,7 @@ static byte[] randomBuffer(Random random, int length) { static ScramCredentialData randomScramCredentialData(Random random) { return new ScramCredentialData( + randomBuffer(random, 1024), randomBuffer(random, 1024), randomBuffer(random, 1024), 1024 + random.nextInt(1024)); @@ -89,9 +90,10 @@ static ScramCredentialData randomScramCredentialData(Random random) { DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord(). setName("alpha"). setMechanism(SCRAM_SHA_256.type()). - setIterations(secondAlpha256Credential.iterations()). setSalt(secondAlpha256Credential.salt()). - setSaltedPassword(secondAlpha256Credential.saltedPassword()), (short) 0)); + setStoredKey(secondAlpha256Credential.storedKey()). + setServerKey(secondAlpha256Credential.serverKey()). + setIterations(secondAlpha256Credential.iterations()), (short) 0)); DELTA1 = new ScramDelta(IMAGE1); RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); From a43761353274f644cd05a14a5bc93236a070f8a7 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 5 Apr 2023 13:00:03 -0400 Subject: [PATCH 02/10] SCRAM support is now only available with IBP_3_5_IV1 --- core/src/main/scala/kafka/tools/StorageTool.scala | 2 +- core/src/main/scala/kafka/zk/ZkMigrationClient.scala | 1 - core/src/test/scala/unit/kafka/tools/StorageToolTest.scala | 2 +- .../org/apache/kafka/server/common/MetadataVersion.java | 6 +++--- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 440f35be5a53e..11dd4a54c8e60 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -65,7 +65,7 @@ object StorageTool extends Logging { val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { if (!metadataVersion.isScramSupported()) { - throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later."); + throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV1 or later."); } for (record <- userScramCredentialRecords) { metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 002d43700f9a9..e63428fcc3d4b 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -34,7 +34,6 @@ import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils -import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState} import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 539dcd390c826..6e77a51ff8b45 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -325,7 +325,7 @@ Found problem: try { assertEquals(1, StorageTool.main(args)) } catch { - case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.", exitString) + case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV1 or later.", exitString) } finally { Exit.resetExitProcedure() } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index b0593f40d0763..925ca8424ff93 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -167,10 +167,10 @@ public enum MetadataVersion { // and updates to a handful of RPCs. IBP_3_4_IV0(8, "3.4", "IV0", true), - // Support for tiered storage (KIP-405) and SCRAM + // Support for tiered storage (KIP-405) IBP_3_5_IV0(9, "3.5", "IV0", false), - // Adds replica epoch to Fetch request (KIP-903). + // Adds replica epoch to Fetch request (KIP-903) and SCRAM. IBP_3_5_IV1(10, "3.5", "IV1", false); // NOTE: update the default version in @ClusterTest annotation to point to the latest version @@ -254,7 +254,7 @@ public boolean isApiForwardingEnabled() { } public boolean isScramSupported() { - return this.isAtLeast(IBP_3_5_IV0); + return this.isAtLeast(IBP_3_5_IV1); } public boolean isKRaftSupported() { From 10d9f1e5373c517f63068bf2a96b41959877e0b0 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Wed, 5 Apr 2023 15:58:26 -0400 Subject: [PATCH 03/10] Code cleanup --- .../scala/kafka/zk/ZkMigrationClient.scala | 19 ------------------- .../kafka/image/ScramCredentialData.java | 14 +------------- 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index e63428fcc3d4b..90e6a08468b25 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -24,7 +24,6 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, ZkAdminM import kafka.utils.{Logging, PasswordEncoder} import kafka.zk.TopicZNode.TopicIdReplicaAssignment import kafka.zookeeper._ -import org.apache.kafka.clients.admin.ScramMechanism import org.apache.kafka.common.acl.AccessControlEntry import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.ControllerMovedException @@ -33,7 +32,6 @@ import org.apache.kafka.common.metadata._ import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.{TopicPartition, Uuid} -import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration} import org.apache.kafka.metadata.migration.{MigrationClient, MigrationClientAuthException, MigrationClientException, ZkMigrationLeadershipState} import org.apache.kafka.server.common.{ApiMessageAndVersion, ProducerIdsBlock} @@ -209,27 +207,10 @@ class ZkMigrationClient( ): Unit = wrapZkException { val adminZkClient = new AdminZkClient(zkClient) - println(s"We are here: ") def migrateEntityType(entityType: String): Unit = { adminZkClient.fetchAllEntityConfigs(entityType).foreach { case (name, props) => - println(s"Name: ${name}") val entity = new EntityData().setEntityType(entityType).setEntityName(name) val batch = new util.ArrayList[ApiMessageAndVersion]() - ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => - val propertyValue = props.getProperty(mechanism.mechanismName) - if (propertyValue != null) { - println(s"Found ${propertyValue} for Key:${mechanism.mechanismName}") - val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) - batch.add(new ApiMessageAndVersion(new UserScramCredentialRecord() - .setName(name) - .setMechanism(mechanism.`type`) - .setSalt(scramCredentials.salt) - .setStoredKey(scramCredentials.storedKey) - .setStoredKey(scramCredentials.storedKey) - .setIterations(scramCredentials.iterations), 0.toShort)) - props.remove(mechanism.mechanismName) - } - } ZkAdminManager.clientQuotaPropsToDoubleMap(props.asScala).foreach { case (key: String, value: Double) => batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() .setEntity(List(entity).asJava) diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java index a6882d0b49578..b8ec0d4fd216a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java @@ -20,13 +20,11 @@ import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.security.scram.ScramCredential; -// import org.apache.kafka.common.security.scram.internals.ScramFormatter; -// import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.Objects; -// XXX Can I replace this with just ScramCredential --proven 2023/03/31 +// XXX Can I replace org.apache.kafka.common.security.scram.ScramCredential with this --proven 2023/03/31 /** * Represents the ACLs in the metadata image. @@ -91,18 +89,8 @@ public UserScramCredentialRecord toRecord( } public ScramCredential toCredential(ScramMechanism mechanism) { -// ) throws GeneralSecurityException { return new ScramCredential(salt, storedKey, serverKey, iterations); } -// org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism = -// org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName()); -// ScramFormatter formatter = new ScramFormatter(internalMechanism); -// -// return new ScramCredential(salt, -// formatter.storedKey(formatter.clientKey(saltedPassword)), -// formatter.serverKey(saltedPassword), -// iterations); -// } @Override public int hashCode() { From 535ebf92c5005d3247524c4501830dedbd27d946 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Thu, 6 Apr 2023 08:31:24 -0400 Subject: [PATCH 04/10] Update everything to IV2 --- core/src/main/scala/kafka/tools/StorageTool.scala | 2 +- .../test/java/kafka/test/ClusterTestExtensionsTest.java | 2 +- .../src/test/java/kafka/test/annotation/ClusterTest.java | 2 +- .../test/scala/unit/kafka/admin/FeatureCommandTest.scala | 4 ++-- .../test/scala/unit/kafka/tools/StorageToolTest.scala | 2 +- .../org/apache/kafka/controller/ScramControlManager.java | 1 - .../apache/kafka/controller/QuorumControllerTest.java | 8 ++++---- .../org/apache/kafka/server/common/MetadataVersion.java | 9 ++++++--- 8 files changed, 16 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 11dd4a54c8e60..7925089ecd57c 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -65,7 +65,7 @@ object StorageTool extends Logging { val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { if (!metadataVersion.isScramSupported()) { - throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV1 or later."); + throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later."); } for (record <- userScramCredentialRecords) { metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 98116dda9fc1a..5bfe9b1394da3 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -117,6 +117,6 @@ public void testNoAutoStart() { @ClusterTest public void testDefaults(ClusterConfig config) { - Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1, config.metadataVersion()); + Assertions.assertEquals(MetadataVersion.IBP_3_5_IV2, config.metadataVersion()); } } diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index 813b6d6575839..bbfb5fd3a29d2 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -41,6 +41,6 @@ String name() default ""; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; - MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1; + MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV2; ClusterConfigProperty[] serverProperties() default {}; } diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala index 48337d75720da..41cc3bd69f2a4 100644 --- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala @@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness { Array("--bootstrap-server", bootstrapServers(), "describe"), env.out)) assertEquals(String.format( "Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"), + "SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"), env.outputWithoutEpoch()) } } @@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness { assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), "disable", "--feature", "metadata.version"), env.out)) assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 1000 only supports versions 1-10", env.outputWithoutEpoch()) + "metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch()) } TestUtils.resource(FeatureCommandTestEnv()) { env => assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(), diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 6e77a51ff8b45..f745c6d4bf66e 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -325,7 +325,7 @@ Found problem: try { assertEquals(1, StorageTool.main(args)) } catch { - case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV1 or later.", exitString) + case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.", exitString) } finally { Exit.resetExitProcedure() } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java index 2cf4393fe156e..eb4dfff8546f8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java @@ -271,7 +271,6 @@ static ApiError finishUpsertion(List records, ScramCredent setIterations(upsertion.iterations()), (short) 0)); } catch (Throwable e) { - // XXX Should We just return UNACCEPTABLE_CREDENTIAL --proven 2023/03/31 return ApiError.fromThrowable(e); } return ApiError.NONE; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 6fffcddf0a03e..1894a41fcf23e 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -183,7 +183,7 @@ public void testConfigurationOperations() throws Throwable { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testConfigurationOperations(controlEnv.activeController()); @@ -224,7 +224,7 @@ public void testDelayedConfigurationOperations() throws Throwable { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)). setBrokerId(0). setClusterId(logEnv.clusterId())).get(); testDelayedConfigurationOperations(logEnv, controlEnv.activeController()); @@ -554,7 +554,7 @@ public void testUnregisterBroker() throws Throwable { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)). setListeners(listeners)); assertEquals(2L, reply.get().epoch()); CreateTopicsRequestData createTopicsRequestData = @@ -975,7 +975,7 @@ private Map registerBrokers(QuorumController controller, int numB .setBrokerId(brokerId) .setRack(null) .setClusterId(controller.clusterId()) - .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)) + .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)) .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) .setListeners( new ListenerCollection( diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 925ca8424ff93..8b29d4d886bce 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -170,8 +170,11 @@ public enum MetadataVersion { // Support for tiered storage (KIP-405) IBP_3_5_IV0(9, "3.5", "IV0", false), - // Adds replica epoch to Fetch request (KIP-903) and SCRAM. - IBP_3_5_IV1(10, "3.5", "IV1", false); + // Adds replica epoch to Fetch request (KIP-903). + IBP_3_5_IV1(10, "3.5", "IV1", false), + + // Support for SCRAM + IBP_3_5_IV2(11, "3.5", "IV2", false); // NOTE: update the default version in @ClusterTest annotation to point to the latest version public static final String FEATURE_NAME = "metadata.version"; @@ -254,7 +257,7 @@ public boolean isApiForwardingEnabled() { } public boolean isScramSupported() { - return this.isAtLeast(IBP_3_5_IV1); + return this.isAtLeast(IBP_3_5_IV2); } public boolean isKRaftSupported() { From 14c9733969456e8b44a693f2bee81f35c34ce9a9 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Thu, 6 Apr 2023 08:55:46 -0400 Subject: [PATCH 05/10] Fixed a comment --- .../main/java/org/apache/kafka/image/ScramCredentialData.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java index b8ec0d4fd216a..91b23cf549663 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java @@ -24,8 +24,6 @@ import java.util.Arrays; import java.util.Objects; -// XXX Can I replace org.apache.kafka.common.security.scram.ScramCredential with this --proven 2023/03/31 - /** * Represents the ACLs in the metadata image. * From e6d31f515b95a46081540c90acf58e35c167db8f Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Fri, 7 Apr 2023 16:31:10 -0400 Subject: [PATCH 06/10] Address PR comments. --- .../kafka/controller/ScramControlManager.java | 11 -- .../kafka/image/ScramCredentialData.java | 2 +- .../metadata/UserScramCredentialRecord.json | 4 +- .../metadata/ScramCredentialDataTest.java | 109 ++++++++++++++++++ 4 files changed, 112 insertions(+), 14 deletions(-) create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java diff --git a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java index eb4dfff8546f8..e9a5e5db43736 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java @@ -342,15 +342,4 @@ public void replay(UserScramCredentialRecord record) { } } - ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) { - return new ApiMessageAndVersion(new UserScramCredentialRecord(). - setName(key.username). - setMechanism(key.mechanism.type()). - setSalt(value.salt). - setStoredKey(value.storedKey). - setServerKey(value.serverKey). - setIterations(value.iterations), - (short) 0); - } - } diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java index 91b23cf549663..0e65f2de2c8d7 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java @@ -35,7 +35,7 @@ public final class ScramCredentialData { private final byte[] serverKey; private final int iterations; - static ScramCredentialData fromRecord( + public static ScramCredentialData fromRecord( UserScramCredentialRecord record ) { return new ScramCredentialData( diff --git a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json index f3f9687439a95..7ab6a6e9454b5 100644 --- a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json +++ b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json @@ -27,9 +27,9 @@ { "name": "Salt", "type": "bytes", "versions": "0+", "about": "A random salt generated by the client." }, { "name": "StoredKey", "type": "bytes", "versions": "0+", - "about": "The Stored Key." }, + "about": "The key the Server uses to authenticate the Client." }, { "name": "ServerKey", "type": "bytes", "versions": "0+", - "about": "The Server Key." }, + "about": "The key the Client uses to validate the Servers identity." }, { "name": "Iterations", "type": "int32", "versions": "0+", "about": "The number of iterations used in the SCRAM credential." } ] diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java new file mode 100644 index 0000000000000..c263d5ea03b99 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.image.ScramCredentialData; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.util.MockRandom; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +@Timeout(value = 40) +public class ScramCredentialDataTest { + + static MockRandom random = new MockRandom(); + + static byte[] randomBuffer(Random random, int length) { + byte[] buf = new byte[length]; + random.nextBytes(buf); + return buf; + } + + private static final List SCRAMCREDENTIALDATA = Arrays.asList( + new ScramCredentialData( + randomBuffer(random, 1024), + randomBuffer(random, 1024), + randomBuffer(random, 1024), + 4096), + new ScramCredentialData( + randomBuffer(random, 1024), + randomBuffer(random, 1024), + randomBuffer(random, 1024), + 8192), + new ScramCredentialData( + randomBuffer(random, 1024), + randomBuffer(random, 1024), + randomBuffer(random, 1024), + 10000)); + + @Test + public void testValues() { + assertEquals(4096, SCRAMCREDENTIALDATA.get(0).iterations()); + assertEquals(8192, SCRAMCREDENTIALDATA.get(1).iterations()); + assertEquals(10000, SCRAMCREDENTIALDATA.get(2).iterations()); + } + + @Test + public void testEquals() { + assertNotEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(1)); + assertNotEquals(SCRAMCREDENTIALDATA.get(1), SCRAMCREDENTIALDATA.get(0)); + assertNotEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(2)); + assertNotEquals(SCRAMCREDENTIALDATA.get(2), SCRAMCREDENTIALDATA.get(0)); + assertEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(0)); + assertEquals(SCRAMCREDENTIALDATA.get(1), SCRAMCREDENTIALDATA.get(1)); + assertEquals(SCRAMCREDENTIALDATA.get(2), SCRAMCREDENTIALDATA.get(2)); + } + + @Test + public void testToString() { + assertEquals("ScramCredentialData" + + "(salt=" + "[hidden]" + + ", storedKey=" + "[hidden]" + + ", serverKey=" + "[hidden]" + + ", iterations=" + "[hidden]" + + ")", SCRAMCREDENTIALDATA.get(0).toString()); + } + + @Test + public void testFromRecordAndToRecord() { + testRoundTrip(SCRAMCREDENTIALDATA.get(0)); + testRoundTrip(SCRAMCREDENTIALDATA.get(1)); + testRoundTrip(SCRAMCREDENTIALDATA.get(2)); + } + + private void testRoundTrip(ScramCredentialData scramCredentialData) { + ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion( + scramCredentialData.toRecord("alice", ScramMechanism.SCRAM_SHA_256), (short) 0); + ScramCredentialData scramCredentialData2 = ScramCredentialData.fromRecord( + (UserScramCredentialRecord) messageAndVersion.message()); + assertEquals(scramCredentialData, scramCredentialData2); + ApiMessageAndVersion messageAndVersion2 = new ApiMessageAndVersion( + scramCredentialData2.toRecord("alice", ScramMechanism.SCRAM_SHA_256), (short) 0); + assertEquals(messageAndVersion, messageAndVersion2); + } + +} From c415190d9d6ec8c421e9c1bd2397cf027d5a9a63 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Mon, 10 Apr 2023 10:10:35 -0400 Subject: [PATCH 07/10] Move ScramCredentialData.java from inage to metadata with the other metadata updating files. --- .../org/apache/kafka/{image => metadata}/ScramCredentialData.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename metadata/src/main/java/org/apache/kafka/{image => metadata}/ScramCredentialData.java (100%) diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java similarity index 100% rename from metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java rename to metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java From 780c27c93921c68d9cbf9c18b84c024217c49a1e Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Mon, 10 Apr 2023 10:35:07 -0400 Subject: [PATCH 08/10] Update package for ScramCredentialData.java and all files that depend on it. --- metadata/src/main/java/org/apache/kafka/image/ScramDelta.java | 1 + metadata/src/main/java/org/apache/kafka/image/ScramImage.java | 1 + .../java/org/apache/kafka/metadata/ScramCredentialData.java | 2 +- .../src/test/java/org/apache/kafka/image/ScramImageTest.java | 1 + .../java/org/apache/kafka/metadata/ScramCredentialDataTest.java | 1 - 5 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java index a26b18c744688..129ea5951c24c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord; import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.metadata.ScramCredentialData; import java.util.HashMap; import java.util.Map.Entry; diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java index 97289b19f8872..a48d6f66652df 100644 --- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo; import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.metadata.ScramCredentialData; import java.util.Collections; import java.util.List; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java index 0e65f2de2c8d7..6919d09231016 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.image; +package org.apache.kafka.metadata; import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.common.metadata.UserScramCredentialRecord; diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java index 4031a9c6d5a44..9fd7f2bb8b74e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.util.MockRandom; import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.ScramCredentialData; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java index c263d5ea03b99..1b4d9eaccd148 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.common.metadata.UserScramCredentialRecord; -import org.apache.kafka.image.ScramCredentialData; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.util.MockRandom; import org.junit.jupiter.api.Test; From b1bbc086c9542e11ede188a5892402f4f44abee2 Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Tue, 11 Apr 2023 08:55:23 -0400 Subject: [PATCH 09/10] This file should not be part of the commit Removing some debug prints I put in for validating migration. --- .../apache/kafka/metadata/migration/KRaftMigrationDriver.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 7d32fd43deb97..1534e1b9509ec 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -458,9 +458,8 @@ public void run() throws Exception { zkMigrationClient.readAllMetadata(batch -> { try { if (log.isTraceEnabled()) { - log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); + log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); } else { - log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); log.info("Migrating {} records from ZK", batch.size()); } CompletableFuture future = zkRecordConsumer.acceptBatch(batch); From 792c9e940c04dadfbb1e93c05a792759b119473e Mon Sep 17 00:00:00 2001 From: Proven Provenzano Date: Mon, 17 Apr 2023 15:18:20 -0400 Subject: [PATCH 10/10] Update that the metadata changed because of the changes to the SCRAM record. --- .../java/org/apache/kafka/server/common/MetadataVersion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 8b29d4d886bce..c095ec751f03e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -174,7 +174,7 @@ public enum MetadataVersion { IBP_3_5_IV1(10, "3.5", "IV1", false), // Support for SCRAM - IBP_3_5_IV2(11, "3.5", "IV2", false); + IBP_3_5_IV2(11, "3.5", "IV2", true); // NOTE: update the default version in @ClusterTest annotation to point to the latest version public static final String FEATURE_NAME = "metadata.version";