Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait ConfigHandler {
}

/**
* The TopicConfigHandler will process topic config changes from ZooKeeper or the metadata log.
* The TopicConfigHandler will process topic config changes from the metadata log.
* The callback provides the topic name and the full properties set.
*/
class TopicConfigHandler(private val replicaManager: ReplicaManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
nullTopicConfigs.mkString(","))
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
45 changes: 15 additions & 30 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}

@Test
Expand All @@ -305,17 +305,17 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())

logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
}

@ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}")
Expand All @@ -328,10 +328,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
}
Expand All @@ -348,7 +348,7 @@ class LogConfigTest {
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
Expand All @@ -357,11 +357,11 @@ class LogConfigTest {
// It should be able to disable the remote log storage when delete on disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}

Expand All @@ -381,11 +381,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}

Expand All @@ -405,11 +405,11 @@ class LogConfigTest {
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
}
}

Expand Down Expand Up @@ -447,21 +447,6 @@ class LogConfigTest {
LogConfig.validate(logProps)
}

@ParameterizedTest
@ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
def testInValidRemoteConfigsInZK(configKey: String): Unit = {
Comment thread
ijuma marked this conversation as resolved.
val kafkaProps = TestUtils.createDummyBrokerConfig()
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties
logProps.put(configKey, "true")

val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true))
assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode."))
}

@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,17 +512,12 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,
* @param existingConfigs The existing properties
* @param newConfigs The new properties to be validated
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
* @param fromZK true if this is a ZK cluster
*/
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
Map<?, ?> newConfigs,
boolean isRemoteLogStorageSystemEnabled,
boolean fromZK) {
boolean isRemoteLogStorageSystemEnabled) {
validateValues(newConfigs);

if (fromZK) {
validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
}
boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false);
Expand Down Expand Up @@ -564,15 +559,6 @@ public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, ?> newC
}
}

public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " +
"`remote.log.copy.disable` under Zookeeper's mode.");
}
}

public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) {
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
Expand Down Expand Up @@ -630,14 +616,13 @@ private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
public static void validate(Properties props) {
validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false);
validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
}

public static void validate(Map<String, String> existingConfigs,
Properties props,
Map<?, ?> configuredProps,
boolean isRemoteLogStorageSystemEnabled,
boolean fromZK) {
boolean isRemoteLogStorageSystemEnabled) {
validateNames(props);
if (configuredProps == null || configuredProps.isEmpty()) {
Map<?, ?> valueMaps = CONFIG.parse(props);
Expand All @@ -646,7 +631,7 @@ public static void validate(Map<String, String> existingConfigs,
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
combinedConfigs.putAll(props);
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK);
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled);
}
}

Expand Down