diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5c95c8d4284d4..22bf08e71cc21 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -308,7 +308,7 @@
+ files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager).java"/>
{
if (!metadataVersion.isScramSupported()) {
- throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.");
+ throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
@@ -227,12 +227,20 @@ object StorageTool extends Logging {
val iterations = getIterations(argMap, scramMechanism)
val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt, iterations)
- val myrecord = new UserScramCredentialRecord()
- .setName(name)
- .setMechanism(scramMechanism.`type`)
- .setSalt(salt)
- .setIterations(iterations)
- .setSaltedPassword(saltedPassword)
+ val myrecord = try {
+ val formatter = new ScramFormatter(scramMechanism);
+
+ new UserScramCredentialRecord()
+ .setName(name)
+ .setMechanism(scramMechanism.`type`)
+ .setSalt(salt)
+ .setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword)))
+ .setServerKey(formatter.serverKey(saltedPassword))
+ .setIterations(iterations)
+ } catch {
+ case e: Throwable =>
+ throw new TerseFailure(s"Error attempting to create UserScramCredentialRecord: ${e.getMessage}")
+ }
myrecord
}
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 98116dda9fc1a..5bfe9b1394da3 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public void testNoAutoStart() {
@ClusterTest
public void testDefaults(ClusterConfig config) {
- Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1, config.metadataVersion());
+ Assertions.assertEquals(MetadataVersion.IBP_3_5_IV2, config.metadataVersion());
}
}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index 813b6d6575839..bbfb5fd3a29d2 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV2;
ClusterConfigProperty[] serverProperties() default {};
}
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 48337d75720da..41cc3bd69f2a4 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
assertEquals(String.format(
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
- "SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
+ "SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"),
env.outputWithoutEpoch())
}
}
@@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out))
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
- "metadata.version. Local controller 1000 only supports versions 1-10", env.outputWithoutEpoch())
+ "metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 539dcd390c826..f745c6d4bf66e 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -325,7 +325,7 @@ Found problem:
try {
assertEquals(1, StorageTool.main(args))
} catch {
- case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.", exitString)
+ case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.", exitString)
} finally {
Exit.resetExitProcedure()
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
index e50a9cd9798a8..e9a5e5db43736 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ScramControlManager.java
@@ -26,11 +26,13 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -111,22 +113,25 @@ public String toString() {
static class ScramCredentialValue {
private final byte[] salt;
- private final byte[] saltedPassword;
+ private final byte[] storedKey;
+ private final byte[] serverKey;
private final int iterations;
ScramCredentialValue(
byte[] salt,
- byte[] saltedPassword,
+ byte[] storedKey,
+ byte[] serverKey,
int iterations
) {
this.salt = salt;
- this.saltedPassword = saltedPassword;
+ this.storedKey = storedKey;
+ this.serverKey = serverKey;
this.iterations = iterations;
}
@Override
public int hashCode() {
- return Objects.hash(salt, saltedPassword, iterations);
+ return Objects.hash(salt, storedKey, serverKey, iterations);
}
@Override
@@ -135,7 +140,8 @@ public boolean equals(Object o) {
if (!(o.getClass() == this.getClass())) return false;
ScramCredentialValue other = (ScramCredentialValue) o;
return Arrays.equals(salt, other.salt) &&
- Arrays.equals(saltedPassword, other.saltedPassword) &&
+ Arrays.equals(storedKey, other.storedKey) &&
+ Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}
@@ -143,7 +149,8 @@ public boolean equals(Object o) {
public String toString() {
return "ScramCredentialValue" +
"(salt=" + "[hidden]" +
- ", saltedPassword=" + "[hidden]" +
+ ", storedKey=" + "[hidden]" +
+ ", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
@@ -226,16 +233,15 @@ public ControllerResult alterCredentials(
setMechanism(deletion.mechanism()), (short) 0));
}
for (ScramCredentialUpsertion upsertion : userToUpsert.values()) {
- response.results().add(new AlterUserScramCredentialsResult().
- setUser(upsertion.name()).
- setErrorCode(NONE.code()).
- setErrorMessage(null));
- records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
- setName(upsertion.name()).
- setMechanism(upsertion.mechanism()).
- setSalt(upsertion.salt()).
- setSaltedPassword(upsertion.saltedPassword()).
- setIterations(upsertion.iterations()), (short) 0));
+ ApiError error = finishUpsertion(records, upsertion);
+ if (!error.isFailure()) {
+ response.results().add(new AlterUserScramCredentialsResult().
+ setUser(upsertion.name()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null));
+ } else {
+ userToError.put(upsertion.name(), error);
+ }
}
for (Entry entry : userToError.entrySet()) {
response.results().add(new AlterUserScramCredentialsResult().
@@ -246,6 +252,30 @@ public ControllerResult alterCredentials(
return ControllerResult.atomicOf(records, response);
}
+ static ApiError finishUpsertion(List records, ScramCredentialUpsertion upsertion) {
+ org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
+ org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(
+ ScramMechanism.fromType(upsertion.mechanism()).mechanismName());
+
+ try { // Convert from saltedPassword to storedKey and serverKey
+ ScramFormatter formatter = new ScramFormatter(internalMechanism);
+
+ records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
+ setName(upsertion.name()).
+ setMechanism(upsertion.mechanism()).
+ setSalt(upsertion.salt()).
+
+ // Convert from saltedPassword to storedKey and serverKey
+ setStoredKey(formatter.storedKey(formatter.clientKey(upsertion.saltedPassword()))).
+ setServerKey(formatter.serverKey(upsertion.saltedPassword())).
+ setIterations(upsertion.iterations()), (short) 0));
+
+ } catch (Throwable e) {
+ return ApiError.fromThrowable(e);
+ }
+ return ApiError.NONE;
+ }
+
static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) {
ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism());
ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism);
@@ -300,7 +330,8 @@ public void replay(UserScramCredentialRecord record) {
ScramCredentialKey key = new ScramCredentialKey(record.name(),
ScramMechanism.fromType(record.mechanism()));
ScramCredentialValue value = new ScramCredentialValue(record.salt(),
- record.saltedPassword(),
+ record.storedKey(),
+ record.serverKey(),
record.iterations());
if (credentials.put(key, value) == null) {
log.info("Created new SCRAM credential for {} with mechanism {}.",
@@ -311,14 +342,4 @@ public void replay(UserScramCredentialRecord record) {
}
}
- ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) {
- return new ApiMessageAndVersion(new UserScramCredentialRecord().
- setName(key.username).
- setMechanism(key.mechanism.type()).
- setSalt(value.salt).
- setSaltedPassword(value.saltedPassword).
- setIterations(value.iterations),
- (short) 0);
- }
-
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java
index a26b18c744688..129ea5951c24c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramDelta.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.metadata.ScramCredentialData;
import java.util.HashMap;
import java.util.Map.Entry;
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index 97289b19f8872..a48d6f66652df 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -26,6 +26,7 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.metadata.ScramCredentialData;
import java.util.Collections;
import java.util.List;
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
similarity index 67%
rename from metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java
rename to metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
index ab45046d257fa..6919d09231016 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramCredentialData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.kafka.image;
+package org.apache.kafka.metadata;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.security.scram.internals.ScramFormatter;
-import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Objects;
-
/**
* Represents the ACLs in the metadata image.
*
@@ -34,25 +31,29 @@
*/
public final class ScramCredentialData {
private final byte[] salt;
- private final byte[] saltedPassword;
+ private final byte[] storedKey;
+ private final byte[] serverKey;
private final int iterations;
- static ScramCredentialData fromRecord(
+ public static ScramCredentialData fromRecord(
UserScramCredentialRecord record
) {
return new ScramCredentialData(
record.salt(),
- record.saltedPassword(),
+ record.storedKey(),
+ record.serverKey(),
record.iterations());
}
public ScramCredentialData(
byte[] salt,
- byte[] saltedPassword,
+ byte[] storedKey,
+ byte[] serverKey,
int iterations
) {
this.salt = salt;
- this.saltedPassword = saltedPassword;
+ this.storedKey = storedKey;
+ this.serverKey = serverKey;
this.iterations = iterations;
}
@@ -60,8 +61,12 @@ public byte[] salt() {
return salt;
}
- public byte[] saltedPassword() {
- return saltedPassword;
+ public byte[] storedKey() {
+ return storedKey;
+ }
+
+ public byte[] serverKey() {
+ return serverKey;
}
public int iterations() {
@@ -76,25 +81,18 @@ public UserScramCredentialRecord toRecord(
setName(userName).
setMechanism(mechanism.type()).
setSalt(salt).
- setSaltedPassword(saltedPassword).
+ setStoredKey(storedKey).
+ setServerKey(serverKey).
setIterations(iterations);
}
- public ScramCredential toCredential(
- ScramMechanism mechanism
- ) throws GeneralSecurityException {
- org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
- org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
- ScramFormatter formatter = new ScramFormatter(internalMechanism);
- return new ScramCredential(salt,
- formatter.storedKey(formatter.clientKey(saltedPassword)),
- formatter.serverKey(saltedPassword),
- iterations);
+ public ScramCredential toCredential(ScramMechanism mechanism) {
+ return new ScramCredential(salt, storedKey, serverKey, iterations);
}
@Override
public int hashCode() {
- return Objects.hash(salt, saltedPassword, iterations);
+ return Objects.hash(salt, storedKey, serverKey, iterations);
}
@Override
@@ -103,7 +101,8 @@ public boolean equals(Object o) {
if (!o.getClass().equals(ScramCredentialData.class)) return false;
ScramCredentialData other = (ScramCredentialData) o;
return Arrays.equals(salt, other.salt) &&
- Arrays.equals(saltedPassword, other.saltedPassword) &&
+ Arrays.equals(storedKey, other.storedKey) &&
+ Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}
@@ -111,7 +110,8 @@ public boolean equals(Object o) {
public String toString() {
return "ScramCredentialData" +
"(salt=" + "[hidden]" +
- ", saltedPassword=" + "[hidden]" +
+ ", storedKey=" + "[hidden]" +
+ ", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
diff --git a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
index a24cc53194a46..7ab6a6e9454b5 100644
--- a/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
+++ b/metadata/src/main/resources/common/metadata/UserScramCredentialRecord.json
@@ -26,8 +26,10 @@
"about": "The SCRAM mechanism." },
{ "name": "Salt", "type": "bytes", "versions": "0+",
"about": "A random salt generated by the client." },
- { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
- "about": "The salted password." },
+ { "name": "StoredKey", "type": "bytes", "versions": "0+",
+ "about": "The key the Server uses to authenticate the Client." },
+ { "name": "ServerKey", "type": "bytes", "versions": "0+",
+ "about": "The key the Client uses to validate the Servers identity." },
{ "name": "Iterations", "type": "int32", "versions": "0+",
"about": "The number of iterations used in the SCRAM credential." }
]
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 6fffcddf0a03e..1894a41fcf23e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -183,7 +183,7 @@ public void testConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
@@ -224,7 +224,7 @@ public void testDelayedConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setBrokerId(0).
setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@@ -554,7 +554,7 @@ public void testUnregisterBroker() throws Throwable {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2)).
setListeners(listeners));
assertEquals(2L, reply.get().epoch());
CreateTopicsRequestData createTopicsRequestData =
@@ -975,7 +975,7 @@ private Map registerBrokers(QuorumController controller, int numB
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
- .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV1))
+ .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_5_IV2))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setListeners(
new ListenerCollection(
diff --git a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
index b96be23cded19..9fd7f2bb8b74e 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ScramImageTest.java
@@ -25,6 +25,7 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.MockRandom;
import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.ScramCredentialData;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -59,6 +60,7 @@ static byte[] randomBuffer(Random random, int length) {
static ScramCredentialData randomScramCredentialData(Random random) {
return new ScramCredentialData(
+ randomBuffer(random, 1024),
randomBuffer(random, 1024),
randomBuffer(random, 1024),
1024 + random.nextInt(1024));
@@ -89,9 +91,10 @@ static ScramCredentialData randomScramCredentialData(Random random) {
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName("alpha").
setMechanism(SCRAM_SHA_256.type()).
- setIterations(secondAlpha256Credential.iterations()).
setSalt(secondAlpha256Credential.salt()).
- setSaltedPassword(secondAlpha256Credential.saltedPassword()), (short) 0));
+ setStoredKey(secondAlpha256Credential.storedKey()).
+ setServerKey(secondAlpha256Credential.serverKey()).
+ setIterations(secondAlpha256Credential.iterations()), (short) 0));
DELTA1 = new ScramDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java
new file mode 100644
index 0000000000000..1b4d9eaccd148
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ScramCredentialDataTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.util.MockRandom;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@Timeout(value = 40)
+public class ScramCredentialDataTest {
+
+ static MockRandom random = new MockRandom();
+
+ static byte[] randomBuffer(Random random, int length) {
+ byte[] buf = new byte[length];
+ random.nextBytes(buf);
+ return buf;
+ }
+
+ private static final List SCRAMCREDENTIALDATA = Arrays.asList(
+ new ScramCredentialData(
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ 4096),
+ new ScramCredentialData(
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ 8192),
+ new ScramCredentialData(
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ randomBuffer(random, 1024),
+ 10000));
+
+ @Test
+ public void testValues() {
+ assertEquals(4096, SCRAMCREDENTIALDATA.get(0).iterations());
+ assertEquals(8192, SCRAMCREDENTIALDATA.get(1).iterations());
+ assertEquals(10000, SCRAMCREDENTIALDATA.get(2).iterations());
+ }
+
+ @Test
+ public void testEquals() {
+ assertNotEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(1));
+ assertNotEquals(SCRAMCREDENTIALDATA.get(1), SCRAMCREDENTIALDATA.get(0));
+ assertNotEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(2));
+ assertNotEquals(SCRAMCREDENTIALDATA.get(2), SCRAMCREDENTIALDATA.get(0));
+ assertEquals(SCRAMCREDENTIALDATA.get(0), SCRAMCREDENTIALDATA.get(0));
+ assertEquals(SCRAMCREDENTIALDATA.get(1), SCRAMCREDENTIALDATA.get(1));
+ assertEquals(SCRAMCREDENTIALDATA.get(2), SCRAMCREDENTIALDATA.get(2));
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("ScramCredentialData" +
+ "(salt=" + "[hidden]" +
+ ", storedKey=" + "[hidden]" +
+ ", serverKey=" + "[hidden]" +
+ ", iterations=" + "[hidden]" +
+ ")", SCRAMCREDENTIALDATA.get(0).toString());
+ }
+
+ @Test
+ public void testFromRecordAndToRecord() {
+ testRoundTrip(SCRAMCREDENTIALDATA.get(0));
+ testRoundTrip(SCRAMCREDENTIALDATA.get(1));
+ testRoundTrip(SCRAMCREDENTIALDATA.get(2));
+ }
+
+ private void testRoundTrip(ScramCredentialData scramCredentialData) {
+ ApiMessageAndVersion messageAndVersion = new ApiMessageAndVersion(
+ scramCredentialData.toRecord("alice", ScramMechanism.SCRAM_SHA_256), (short) 0);
+ ScramCredentialData scramCredentialData2 = ScramCredentialData.fromRecord(
+ (UserScramCredentialRecord) messageAndVersion.message());
+ assertEquals(scramCredentialData, scramCredentialData2);
+ ApiMessageAndVersion messageAndVersion2 = new ApiMessageAndVersion(
+ scramCredentialData2.toRecord("alice", ScramMechanism.SCRAM_SHA_256), (short) 0);
+ assertEquals(messageAndVersion, messageAndVersion2);
+ }
+
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index b0593f40d0763..c095ec751f03e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -167,11 +167,14 @@ public enum MetadataVersion {
// and updates to a handful of RPCs.
IBP_3_4_IV0(8, "3.4", "IV0", true),
- // Support for tiered storage (KIP-405) and SCRAM
+ // Support for tiered storage (KIP-405)
IBP_3_5_IV0(9, "3.5", "IV0", false),
// Adds replica epoch to Fetch request (KIP-903).
- IBP_3_5_IV1(10, "3.5", "IV1", false);
+ IBP_3_5_IV1(10, "3.5", "IV1", false),
+
+ // Support for SCRAM
+ IBP_3_5_IV2(11, "3.5", "IV2", true);
// NOTE: update the default version in @ClusterTest annotation to point to the latest version
public static final String FEATURE_NAME = "metadata.version";
@@ -254,7 +257,7 @@ public boolean isApiForwardingEnabled() {
}
public boolean isScramSupported() {
- return this.isAtLeast(IBP_3_5_IV0);
+ return this.isAtLeast(IBP_3_5_IV2);
}
public boolean isKRaftSupported() {