diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java index dc9017beeda75..db6573fb455b7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import java.util.Map; +import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -272,6 +273,10 @@ public DistributedConfig(Map props) { super(CONFIG, props); } + public static Set configNames() { + return CONFIG.names(); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 231226997833d..143fcd8afb438 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -27,6 +27,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,8 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, ErrorHandlingMetrics errorHandlingMetrics) { String topic = sinkConfig.dlqTopicName(); - try (AdminClient admin = AdminClient.create(workerConfig.originals())) { + Map adminProps = ConnectUtils.retainProducerConfigs(workerConfig.originals()); + try (AdminClient admin = AdminClient.create(adminProps)) { if (!admin.listTopics().names().get().contains(topic)) { log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java index f950edf53542c..313976bde6958 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import java.util.Map; +import java.util.Set; public class StandaloneConfig extends WorkerConfig { private static final ConfigDef CONFIG; @@ -41,4 +42,8 @@ public class StandaloneConfig extends WorkerConfig { public StandaloneConfig(Map props) { super(CONFIG, props); } + + public static Set configNames() { + return CONFIG.names(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e7ee632638d2e..8fc2b8714605d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -411,16 +412,16 @@ public void putTargetState(String connector, TargetState state) { // package private for testing KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) { - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); - Map consumerProps = new HashMap<>(originals); + + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(1). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 195c498edb76f..87d5da7147af0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConvertingFutureCallback; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -67,17 +68,17 @@ public void configure(final WorkerConfig config) { data = new HashMap<>(); - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + producerProps = ConnectUtils.retainProducerConfigs(producerProps); - Map consumerProps = new HashMap<>(originals); + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 6710808f9a97a..85705bef336ec 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -39,6 +39,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.Table; @@ -124,17 +125,16 @@ public void configure(final WorkerConfig config) { if (this.topic == null || this.topic.trim().length() == 0) throw new ConfigException("Must specify topic for connector status."); - Map originals = config.originals(); - Map producerProps = new HashMap<>(originals); + Map producerProps = ConnectUtils.retainProducerConfigs(config.originals()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class - Map consumerProps = new HashMap<>(originals); + Map consumerProps = ConnectUtils.retainConsumerConfigs(config.originals()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - Map adminProps = new HashMap<>(originals); + Map adminProps = ConnectUtils.retainAdminClientConfigs(config.originals()); NewTopic topicDescription = TopicAdmin.defineTopic(topic). compacted(). partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 9f30236fdee47..fc7eace77172e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -17,19 +17,43 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); + private static final List WORKER_CONFIG_PREFIXES = Collections.unmodifiableList(Arrays.asList( + "producer.", + "consumer.", + WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, + WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, + WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, + WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, + WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, + WorkerConfig.REST_EXTENSION_CLASSES_CONFIG, + WorkerConfig.CONFIG_PROVIDERS_CONFIG + )); + public static Long checkAndConvertTimestamp(Long timestamp) { if (timestamp == null || timestamp >= 0) return timestamp; @@ -65,4 +89,176 @@ static String lookupKafkaClusterId(AdminClient adminClient) { + "Check worker's broker connection and security properties.", e); } } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the + * {@link org.apache.kafka.clients.producer.Producer}, including extension components. + * This will remove all properties that are known to be for consumers, admin client, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainProducerConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isProducerConfig); + } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the + * {@link org.apache.kafka.clients.consumer.Consumer}, including extension components. + * This will remove all properties that are known to be for producers, admin client, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainConsumerConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isConsumerConfig); + } + + /** + * Modify the supplied map of configurations to retain only the configuration that may apply to the {@link AdminClient}, + * including metric reporter configuration properties. + * This will remove all properties that are known to be for producers, consumers, and Connect workers. + * + * @param configs the map of configurations to be modified; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainAdminClientConfigs(Map configs) { + return retainConfigs(configs, ConnectUtils::isAdminClientConfig); + } + + /** + * Modify the supplied map of configurations to retain only those configuration name-value pairs that satisfy the supplied predicate. + * + * @param configs the map of configurations to be modified; may not be null + * @param isValid a function that is used to determine which configuration properties to retain; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + */ + public static Map retainConfigs(Map configs, Predicate isValid) { + Iterator> entryIter = configs.entrySet().iterator(); + while (entryIter.hasNext()) { + Map.Entry entry = entryIter.next(); + if (!isValid.test(entry.getKey())) { + log.debug("Not retaining the '{}' config property when passing to a subcomponent", entry.getKey()); + entryIter.remove(); + } + } + return configs; + } + + /** + * Determine if the configuration with the supplied name is a known Connect {@link WorkerConfig} property, including properties + * that are prefixed and passed to various extension. Note that the {@link org.apache.kafka.connect.rest.ConnectRestExtension + * REST Extension} uses unprefixed properties, so this method treats those as unknown. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a property used by Connect workers, or false otherwise + */ + protected static boolean isKnownWorkerConfig(String key) { + if (key == null) { + return false; + } + if (DistributedConfig.configNames().contains(key) || StandaloneConfig.configNames().contains(key)) { + return true; + } + // Include any property that begins with a known prefix + for (String prefix : WORKER_CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + return true; + } + } + // REST Extension properties are not prefixed, so we don't know what they are + return false; + } + + /** + * Determine if the configuration with the supplied name is a known {@link ProducerConfig} property. Producers use metric reporters, + * interceptors, partitioners, and key and value serializers, to which the producer passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig}, + * {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid producer properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a producer property or known to not be a consumer, admin client, + * or worker configuration; or false otherwise + */ + protected static boolean isProducerConfig(String key) { + if (key == null) { + return false; + } + if (ProducerConfig.configNames().contains(key)) { + return true; + } + if (ConsumerConfig.configNames().contains(key)) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // Producers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties, + // so we have to include all non-prefixed properties + return true; + } + + /** + * Determine if the configuration with the supplied name is a known {@link ConsumerConfig} property. Consumers use metric reporters, + * interceptors, partitioners, and key and value deserializers, to which the consumer passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link ProducerConfig}, {@link ConsumerConfig}, + * {@link AdminClientConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid consumer properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a consumer property or known to not be a producer, admin client, + * or worker configuration; or false otherwise + */ + protected static boolean isConsumerConfig(String key) { + if (key == null) { + return false; + } + if (ConsumerConfig.configNames().contains(key)) { + return true; + } + if (ProducerConfig.configNames().contains(key)) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // Consumers can use metrics reporters, interceptors, and other extensions that take non-prefixed properties, + // so we have to include all non-prefixed properties + return true; + } + + /** + * Determine if the configuration with the supplied name is a known {@link AdminClientConfig} property. Admin clients use metric + * reporters to which the admin client passes all of its (unprefixed) properties. + * Therefore, this method will consider all property names unknown to the {@link AdminClientConfig}, {@link ConsumerConfig}, + * {@link ProducerConfig}, and {@link #isKnownWorkerConfig(String) known worker configs} to be valid admin client properties. + * + * @param key the name of the configuration property + * @return true if the named configuration property is known to be a admin client property or known to not be a producer, consumer, + * or worker configuration; or false otherwise + */ + protected static boolean isAdminClientConfig(String key) { + if (key == null) { + return false; + } + if (AdminClientConfig.configNames().contains(key)) { + return true; + } + if (ProducerConfig.configNames().contains(key)) { + return false; + } + if (ConsumerConfig.configNames().contains(key)) { + return false; + } + if (isKnownWorkerConfig(key)) { + return false; + } + // AdminClient can use metrics reporters that take non-prefixed properties, so we have to include all non-prefixed properties + return true; + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index ad21561baf259..00097147bf140 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -165,7 +165,8 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link AdminClient} */ public TopicAdmin(Map adminConfig) { - this(adminConfig, AdminClient.create(adminConfig)); + // Prevent logging unused config warnings + this(adminConfig, AdminClient.create(ConnectUtils.retainAdminClientConfigs(adminConfig))); } // visible for testing diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index 6be3525380b61..1a0dbd83505d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -16,16 +16,28 @@ */ package org.apache.kafka.connect.util; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Node; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.storage.StringConverter; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ConnectUtilsTest { @@ -60,4 +72,184 @@ public void testLookupKafkaClusterIdTimeout() { ConnectUtils.lookupKafkaClusterId(adminClient); } + @Test + public void shouldIncludeKnownWorkerConfigs() { + for (String configName : DistributedConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig(configName)); + } + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertTrue(ConnectUtils.isKnownWorkerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + for (String configName : ProducerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig("producer." + configName)); + } + for (String configName : ConsumerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isKnownWorkerConfig("consumer." + configName)); + } + + // We don't consider extra configs to be known, even though the REST extensions use them + assertFalse(ConnectUtils.isKnownWorkerConfig("some.arbitrary.config")); + } + + @Test + public void shouldIncludeProducerConfigsAndUnknownConfigs() { + for (String configName : ProducerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + assertTrue(ConnectUtils.isProducerConfig("custom.config")); + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (ProducerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (ProducerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isProducerConfig(configName)); + } + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isProducerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Consumer-specific configs + assertFalse(ConnectUtils.isProducerConfig(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + } + + @Test + public void shouldIncludeConsumerConfigsAndUnknownConfigs() { + for (String configName : ConsumerConfig.configNames()) { + assertTrue("Should allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + assertTrue(ConnectUtils.isConsumerConfig("custom.config")); + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (ConsumerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (ConsumerConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isConsumerConfig(configName)); + } + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isConsumerConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Producer-specific configs + assertFalse(ConnectUtils.isConsumerConfig(ProducerConfig.ACKS_CONFIG)); + } + + @Test + public void shouldIncludeAdminClientConfigs() { + for (String configName : AdminClientConfig.configNames()) { + assertTrue(ConnectUtils.isAdminClientConfig(configName)); + } + + // Exclude Connect worker configs + for (String configName : DistributedConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + for (String configName : StandaloneConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.CONFIG_PROVIDERS_CONFIG + ".something")); + assertFalse(ConnectUtils.isAdminClientConfig(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG + ".something")); + + // Exclude Producer-specific configs + for (String configName : ProducerConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + + // Exclude Consumer-specific configs + for (String configName : ConsumerConfig.configNames()) { + if (AdminClientConfig.configNames().contains(configName)) { + continue; + } + assertFalse("Should not allow " + configName, ConnectUtils.isAdminClientConfig(configName)); + } + } + + @Test + public void removeNonAdminClientConfigurations() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap1"); + configs.put(AdminClientConfig.CLIENT_ID_CONFIG, "clientId"); + configs.put(AdminClientConfig.RETRIES_CONFIG, "1"); + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100"); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom", "customValue"); + configs.put(ProducerConfig.ACKS_CONFIG, "all"); + configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configs.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "my-config-topic"); + configs.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + configs.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG + ".custom", "foo"); + configs.put("producer." + ProducerConfig.ACKS_CONFIG, "all"); + configs.put("consumer." + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + configs.put("some.other.property", "value"); + configs.put("other.property", "value2"); + assertEquals(15, configs.size()); + Map filtered = ConnectUtils.retainAdminClientConfigs(new HashMap<>(configs)); + assertEquals(configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + filtered.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.CLIENT_ID_CONFIG), + filtered.get(AdminClientConfig.CLIENT_ID_CONFIG)); + assertEquals(configs.get(AdminClientConfig.RETRIES_CONFIG), + filtered.get(AdminClientConfig.RETRIES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), + filtered.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom"), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom")); + assertEquals(configs.get("some.other.property"), + filtered.get("some.other.property")); + assertEquals(configs.get("other.property"), + filtered.get("other.property")); + assertFalse(filtered.containsKey(ProducerConfig.ACKS_CONFIG)); + assertFalse(filtered.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.CONFIG_TOPIC_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG)); + assertFalse(filtered.containsKey(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG + ".custom")); + assertEquals(configs.size() - 7, filtered.size()); + assertTrue(configs.keySet().containsAll(filtered.keySet())); + } + }