Skip to content
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object StorageTool extends Logging {
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
if (!metadataVersion.isScramSupported()) {
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.");
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
Expand Down Expand Up @@ -227,12 +227,20 @@ object StorageTool extends Logging {
val iterations = getIterations(argMap, scramMechanism)
val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt, iterations)

val myrecord = new UserScramCredentialRecord()
.setName(name)
.setMechanism(scramMechanism.`type`)
.setSalt(salt)
.setIterations(iterations)
.setSaltedPassword(saltedPassword)
val myrecord = try {
val formatter = new ScramFormatter(scramMechanism);

new UserScramCredentialRecord()
.setName(name)
.setMechanism(scramMechanism.`type`)
.setSalt(salt)
.setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword)))
.setServerKey(formatter.serverKey(saltedPassword))
.setIterations(iterations)
} catch {
case e: Throwable =>
throw new TerseFailure(s"Error attempting to create UserScramCredentialRecord: ${e.getMessage}")
}
myrecord
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,6 @@ public void testNoAutoStart() {

@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_5_IV2, config.metadataVersion());
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/annotation/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV2;
ClusterConfigProperty[] serverProperties() default {};
}
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
assertEquals(String.format(
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
"SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"),
env.outputWithoutEpoch())
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out))
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 1000 only supports versions 1-10", env.outputWithoutEpoch())
"metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ Found problem:
try {
assertEquals(1, StorageTool.main(args))
} catch {
case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.", exitString)
case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.", exitString)
} finally {
Exit.resetExitProcedure()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;

import org.slf4j.Logger;

import java.util.ArrayList;
Expand Down Expand Up @@ -111,22 +113,25 @@ public String toString() {

static class ScramCredentialValue {
private final byte[] salt;
private final byte[] saltedPassword;
private final byte[] storedKey;
private final byte[] serverKey;
private final int iterations;

ScramCredentialValue(
byte[] salt,
byte[] saltedPassword,
byte[] storedKey,
byte[] serverKey,
int iterations
) {
this.salt = salt;
this.saltedPassword = saltedPassword;
this.storedKey = storedKey;
this.serverKey = serverKey;
this.iterations = iterations;
}

@Override
public int hashCode() {
return Objects.hash(salt, saltedPassword, iterations);
return Objects.hash(salt, storedKey, serverKey, iterations);
}

@Override
Expand All @@ -135,15 +140,17 @@ public boolean equals(Object o) {
if (!(o.getClass() == this.getClass())) return false;
ScramCredentialValue other = (ScramCredentialValue) o;
return Arrays.equals(salt, other.salt) &&
Arrays.equals(saltedPassword, other.saltedPassword) &&
Arrays.equals(storedKey, other.storedKey) &&
Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}

@Override
public String toString() {
return "ScramCredentialValue" +
"(salt=" + "[hidden]" +
", saltedPassword=" + "[hidden]" +
", storedKey=" + "[hidden]" +
", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
Expand Down Expand Up @@ -226,16 +233,15 @@ public ControllerResult<AlterUserScramCredentialsResponseData> alterCredentials(
setMechanism(deletion.mechanism()), (short) 0));
}
for (ScramCredentialUpsertion upsertion : userToUpsert.values()) {
response.results().add(new AlterUserScramCredentialsResult().
setUser(upsertion.name()).
setErrorCode(NONE.code()).
setErrorMessage(null));
records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(upsertion.name()).
setMechanism(upsertion.mechanism()).
setSalt(upsertion.salt()).
setSaltedPassword(upsertion.saltedPassword()).
setIterations(upsertion.iterations()), (short) 0));
ApiError error = finishUpsertion(records, upsertion);
if (!error.isFailure()) {
response.results().add(new AlterUserScramCredentialsResult().
setUser(upsertion.name()).
setErrorCode(NONE.code()).
setErrorMessage(null));
} else {
userToError.put(upsertion.name(), error);
}
}
for (Entry<String, ApiError> entry : userToError.entrySet()) {
response.results().add(new AlterUserScramCredentialsResult().
Expand All @@ -246,6 +252,30 @@ public ControllerResult<AlterUserScramCredentialsResponseData> alterCredentials(
return ControllerResult.atomicOf(records, response);
}

static ApiError finishUpsertion(List<ApiMessageAndVersion> records, ScramCredentialUpsertion upsertion) {
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(
ScramMechanism.fromType(upsertion.mechanism()).mechanismName());

try { // Convert from saltedPassword to storedKey and serverKey
ScramFormatter formatter = new ScramFormatter(internalMechanism);

records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(upsertion.name()).
setMechanism(upsertion.mechanism()).
setSalt(upsertion.salt()).

// Convert from saltedPassword to storedKey and serverKey
setStoredKey(formatter.storedKey(formatter.clientKey(upsertion.saltedPassword()))).
setServerKey(formatter.serverKey(upsertion.saltedPassword())).
setIterations(upsertion.iterations()), (short) 0));

} catch (Throwable e) {
return ApiError.fromThrowable(e);
}
return ApiError.NONE;
}

static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) {
ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism());
ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism);
Expand Down Expand Up @@ -300,7 +330,8 @@ public void replay(UserScramCredentialRecord record) {
ScramCredentialKey key = new ScramCredentialKey(record.name(),
ScramMechanism.fromType(record.mechanism()));
ScramCredentialValue value = new ScramCredentialValue(record.salt(),
record.saltedPassword(),
record.storedKey(),
record.serverKey(),
record.iterations());
if (credentials.put(key, value) == null) {
log.info("Created new SCRAM credential for {} with mechanism {}.",
Expand All @@ -311,14 +342,4 @@ public void replay(UserScramCredentialRecord record) {
}
}

ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) {
return new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(key.username).
setMechanism(key.mechanism.type()).
setSalt(value.salt).
setSaltedPassword(value.saltedPassword).
setIterations(value.iterations),
(short) 0);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.metadata.ScramCredentialData;

import java.util.HashMap;
import java.util.Map.Entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.metadata.ScramCredentialData;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,58 @@
* limitations under the License.
*/

package org.apache.kafka.image;
package org.apache.kafka.metadata;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;

import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Objects;


/**
* Represents the ACLs in the metadata image.
*
* This class is thread-safe.
*/
public final class ScramCredentialData {
private final byte[] salt;
private final byte[] saltedPassword;
private final byte[] storedKey;
private final byte[] serverKey;
private final int iterations;

static ScramCredentialData fromRecord(
public static ScramCredentialData fromRecord(
UserScramCredentialRecord record
) {
return new ScramCredentialData(
record.salt(),
record.saltedPassword(),
record.storedKey(),
record.serverKey(),
record.iterations());
}

public ScramCredentialData(
byte[] salt,
byte[] saltedPassword,
byte[] storedKey,
byte[] serverKey,
int iterations
) {
this.salt = salt;
this.saltedPassword = saltedPassword;
this.storedKey = storedKey;
this.serverKey = serverKey;
this.iterations = iterations;
}

public byte[] salt() {
return salt;
}

public byte[] saltedPassword() {
return saltedPassword;
public byte[] storedKey() {
return storedKey;
}

public byte[] serverKey() {
return serverKey;
}

public int iterations() {
Expand All @@ -76,25 +81,18 @@ public UserScramCredentialRecord toRecord(
setName(userName).
setMechanism(mechanism.type()).
setSalt(salt).
setSaltedPassword(saltedPassword).
setStoredKey(storedKey).
setServerKey(serverKey).
setIterations(iterations);
}

public ScramCredential toCredential(
ScramMechanism mechanism
) throws GeneralSecurityException {
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
ScramFormatter formatter = new ScramFormatter(internalMechanism);
return new ScramCredential(salt,
formatter.storedKey(formatter.clientKey(saltedPassword)),
formatter.serverKey(saltedPassword),
iterations);
public ScramCredential toCredential(ScramMechanism mechanism) {
return new ScramCredential(salt, storedKey, serverKey, iterations);
}

@Override
public int hashCode() {
return Objects.hash(salt, saltedPassword, iterations);
return Objects.hash(salt, storedKey, serverKey, iterations);
}

@Override
Expand All @@ -103,15 +101,17 @@ public boolean equals(Object o) {
if (!o.getClass().equals(ScramCredentialData.class)) return false;
ScramCredentialData other = (ScramCredentialData) o;
return Arrays.equals(salt, other.salt) &&
Arrays.equals(saltedPassword, other.saltedPassword) &&
Arrays.equals(storedKey, other.storedKey) &&
Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}

@Override
public String toString() {
return "ScramCredentialData" +
"(salt=" + "[hidden]" +
", saltedPassword=" + "[hidden]" +
", storedKey=" + "[hidden]" +
", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
"about": "The SCRAM mechanism." },
{ "name": "Salt", "type": "bytes", "versions": "0+",
"about": "A random salt generated by the client." },
{ "name": "SaltedPassword", "type": "bytes", "versions": "0+",
"about": "The salted password." },
{ "name": "StoredKey", "type": "bytes", "versions": "0+",
"about": "The key the Server uses to authenticate the Client." },
{ "name": "ServerKey", "type": "bytes", "versions": "0+",
"about": "The key the Client uses to validate the Servers identity." },
{ "name": "Iterations", "type": "int32", "versions": "0+",
"about": "The number of iterations used in the SCRAM credential." }
]
Expand Down
Loading